/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.replication;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.ServerException;
import com.zendesk.maxwell.MaxwellMysqlConfig;
import com.zendesk.maxwell.bootstrap.BootstrapController;
import com.zendesk.maxwell.filtering.Filter;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.MaxwellOutputConfig;
import com.zendesk.maxwell.replication.BinlogConnectorEvent;
import com.zendesk.maxwell.replication.BinlogConnectorEventListener;
import com.zendesk.maxwell.replication.BinlogConnectorLivenessMonitor;
import com.zendesk.maxwell.replication.BinlogPosition;
import com.zendesk.maxwell.replication.HeartbeatNotifier;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.replication.Replicator;
import com.zendesk.maxwell.replication.TableCache;
import com.zendesk.maxwell.row.HeartbeatRowMap;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.row.RowMapBuffer;
import com.zendesk.maxwell.schema.Schema;
import com.zendesk.maxwell.schema.SchemaStore;
import com.zendesk.maxwell.schema.SchemaStoreException;
import com.zendesk.maxwell.schema.Table;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
import com.zendesk.maxwell.schema.ddl.DDLMap;
import com.zendesk.maxwell.schema.ddl.ResolvedSchemaChange;
import com.zendesk.maxwell.scripting.Scripting;
import com.zendesk.maxwell.util.RunLoopProcess;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogConnectorReplicator
extends RunLoopProcess
implements Replicator,
BinaryLogClient.LifecycleListener {
    static final Logger LOGGER = LoggerFactory.getLogger(BinlogConnectorReplicator.class);
    private static final long MAX_TX_ELEMENTS = 10000L;
    public static int BINLOG_QUEUE_SIZE = 5000;
    public static final int BAD_BINLOG_ERROR_CODE = 1236;
    public static final int ACCESS_DENIED_ERROR_CODE = 1227;
    private final String clientID;
    private final String maxwellSchemaDatabaseName;
    protected final BinaryLogClient client;
    private final int replicationReconnectionRetries;
    private BinlogConnectorEventListener binlogEventListener;
    private BinlogConnectorLivenessMonitor binlogLivenessMonitor;
    private final LinkedBlockingDeque<BinlogConnectorEvent> queue;
    private final TableCache tableCache;
    private final Scripting scripting;
    private ServerException lastCommError;
    private final boolean stopOnEOF;
    private boolean hitEOF = false;
    private Position lastHeartbeatPosition;
    private final HeartbeatNotifier heartbeatNotifier;
    private Long stopAtHeartbeat;
    private Filter filter;
    private Boolean ignoreMissingSchema;
    private final BootstrapController bootstrapper;
    private final AbstractProducer producer;
    private RowMapBuffer rowBuffer;
    private final float bufferMemoryUsage;
    private final Counter rowCounter;
    private final Meter rowMeter;
    private SchemaStore schemaStore;
    private Histogram transactionRowCount;
    private Histogram transactionExecutionTime;
    private final Boolean gtidPositioning;
    private static Pattern createTablePattern = Pattern.compile("^CREATE\\s+TABLE", 2);
    private boolean isConnected = false;
    private boolean replicatorStarted = false;

    public BinlogConnectorReplicator(SchemaStore schemaStore, AbstractProducer producer, BootstrapController bootstrapper, MaxwellMysqlConfig mysqlConfig, Long replicaServerID, String maxwellSchemaDatabaseName, Metrics metrics, Position start, boolean stopOnEOF, String clientID, HeartbeatNotifier heartbeatNotifier, Scripting scripting, Filter filter, MaxwellOutputConfig outputConfig, float bufferMemoryUsage, int replicationReconnectionRetries) {
        this(schemaStore, producer, bootstrapper, mysqlConfig, replicaServerID, maxwellSchemaDatabaseName, metrics, start, stopOnEOF, clientID, heartbeatNotifier, scripting, filter, false, outputConfig, bufferMemoryUsage, replicationReconnectionRetries, BINLOG_QUEUE_SIZE);
    }

    public BinlogConnectorReplicator(SchemaStore schemaStore, AbstractProducer producer, BootstrapController bootstrapper, MaxwellMysqlConfig mysqlConfig, Long replicaServerID, String maxwellSchemaDatabaseName, Metrics metrics, Position start, boolean stopOnEOF, String clientID, HeartbeatNotifier heartbeatNotifier, Scripting scripting, Filter filter, boolean ignoreMissingSchema, MaxwellOutputConfig outputConfig, float bufferMemoryUsage, int replicationReconnectionRetries, int binlogEventQueueSize) {
        this.clientID = clientID;
        this.bootstrapper = bootstrapper;
        this.maxwellSchemaDatabaseName = maxwellSchemaDatabaseName;
        this.producer = producer;
        this.lastHeartbeatPosition = start;
        this.heartbeatNotifier = heartbeatNotifier;
        this.stopOnEOF = stopOnEOF;
        this.scripting = scripting;
        this.schemaStore = schemaStore;
        this.tableCache = new TableCache(maxwellSchemaDatabaseName);
        this.filter = filter;
        this.ignoreMissingSchema = ignoreMissingSchema;
        this.lastCommError = null;
        this.bufferMemoryUsage = bufferMemoryUsage;
        this.queue = new LinkedBlockingDeque(binlogEventQueueSize);
        this.rowCounter = metrics.getRegistry().counter(metrics.metricName("row", "count"));
        this.rowMeter = metrics.getRegistry().meter(metrics.metricName("row", "meter"));
        this.transactionRowCount = metrics.getRegistry().histogram(metrics.metricName("transaction", "row_count"));
        this.transactionExecutionTime = metrics.getRegistry().histogram(metrics.metricName("transaction", "execution_time"));
        this.client = new BinaryLogClient(mysqlConfig.host, mysqlConfig.port.intValue(), mysqlConfig.user, mysqlConfig.password);
        this.client.setSSLMode(mysqlConfig.sslMode);
        this.client.setUseSendAnnotateRowsEvent(true);
        BinlogPosition startBinlog = start.getBinlogPosition();
        if (startBinlog.getGtidSetStr() != null) {
            String gtidStr = startBinlog.getGtidSetStr();
            LOGGER.info("Setting initial gtid to: " + gtidStr);
            this.client.setGtidSet(gtidStr);
            this.gtidPositioning = true;
        } else {
            LOGGER.info("Setting initial binlog pos to: " + startBinlog.getFile() + ":" + startBinlog.getOffset());
            this.client.setBinlogFilename(startBinlog.getFile());
            this.client.setBinlogPosition(startBinlog.getOffset());
            this.gtidPositioning = false;
        }
        this.client.setKeepAlive(false);
        if (mysqlConfig.enableHeartbeat) {
            this.binlogLivenessMonitor = new BinlogConnectorLivenessMonitor(this.client);
            this.client.registerLifecycleListener((BinaryLogClient.LifecycleListener)this.binlogLivenessMonitor);
            this.client.registerEventListener((BinaryLogClient.EventListener)this.binlogLivenessMonitor);
        }
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO, new EventDeserializer.CompatibilityMode[]{EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, EventDeserializer.CompatibilityMode.INVALID_DATE_AND_TIME_AS_MIN_VALUE});
        this.client.setEventDeserializer(eventDeserializer);
        this.binlogEventListener = new BinlogConnectorEventListener(this.client, this.queue, metrics, outputConfig);
        this.client.setBlocking(!stopOnEOF);
        this.client.registerEventListener((BinaryLogClient.EventListener)this.binlogEventListener);
        this.client.registerLifecycleListener((BinaryLogClient.LifecycleListener)this);
        this.client.setServerId((long)replicaServerID.intValue());
        this.replicationReconnectionRetries = replicationReconnectionRetries;
    }

    @Override
    public void work() throws Exception {
        RowMap row = null;
        try {
            row = this.getRow();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (row == null) {
            return;
        }
        this.rowCounter.inc();
        this.rowMeter.mark();
        if (this.scripting != null && !this.isMaxwellRow(row)) {
            this.scripting.invoke(row);
        }
        this.processRow(row);
    }

    @Override
    public void startReplicator() throws Exception {
        this.client.connect(5000L);
        this.replicatorStarted = true;
    }

    @Override
    protected void beforeStop() throws Exception {
        this.binlogEventListener.stop();
        this.client.disconnect();
    }

    public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
        LOGGER.warn("communications failure in binlog:", (Throwable)ex);
        if (ex instanceof ServerException) {
            ServerException serverEx = (ServerException)((Object)ex);
            int errCode = serverEx.getErrorCode();
            switch (errCode) {
                case 1227: 
                case 1236: {
                    this.lastCommError = serverEx;
                }
            }
            LOGGER.debug("error code: {} from server", (Object)errCode);
        }
    }

    @Override
    public Long getLastHeartbeatRead() {
        return this.lastHeartbeatPosition.getLastHeartbeatRead();
    }

    @Override
    public void stopAtHeartbeat(long heartbeat) {
        this.stopAtHeartbeat = heartbeat;
    }

    private void checkCommErrors() throws ServerException {
        if (this.lastCommError != null) {
            throw this.lastCommError;
        }
    }

    private boolean isConnectionAlive() {
        if (!this.isConnected) {
            return false;
        }
        return this.binlogLivenessMonitor == null || this.binlogLivenessMonitor.isAlive();
    }

    private boolean shouldSkipRow(RowMap row) throws IOException {
        if (this.isMaxwellRow(row) && !this.isBootstrapInsert(row)) {
            return true;
        }
        return this.bootstrapper != null && this.bootstrapper.shouldSkip(row);
    }

    protected void processRow(RowMap row) throws Exception {
        if (row instanceof HeartbeatRowMap) {
            long thisHeartbeat;
            this.producer.push(row);
            if (this.stopAtHeartbeat != null && (thisHeartbeat = row.getPosition().getLastHeartbeatRead()) >= this.stopAtHeartbeat) {
                LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator");
                this.taskState.stopped();
            }
        } else if (!this.shouldSkipRow(row)) {
            this.producer.push(row);
        }
    }

    private RowMap processHeartbeats(RowMap row) {
        String hbClientID = (String)row.getData("client_id");
        if (!Objects.equals(hbClientID, this.clientID)) {
            return row;
        }
        long lastHeartbeatRead = (Long)row.getData("heartbeat");
        LOGGER.debug("replicator picked up heartbeat: {}", (Object)lastHeartbeatRead);
        this.lastHeartbeatPosition = row.getPosition().withHeartbeat(lastHeartbeatRead);
        this.heartbeatNotifier.heartbeat(lastHeartbeatRead);
        return HeartbeatRowMap.valueOf(row.getDatabase(), this.lastHeartbeatPosition, row.getNextPosition().withHeartbeat(lastHeartbeatRead));
    }

    private void processQueryEvent(String dbName, String sql, SchemaStore schemaStore, Position position, Position nextPosition, Long timestamp) throws Exception {
        List<ResolvedSchemaChange> changes = schemaStore.processSQL(sql, dbName, position);
        Long schemaId = this.getSchemaId();
        if (this.bootstrapper != null) {
            this.bootstrapper.setCurrentSchemaID(schemaId);
        }
        for (ResolvedSchemaChange change : changes) {
            if (!change.shouldOutput(this.filter)) continue;
            DDLMap ddl = new DDLMap(change, timestamp, sql, position, nextPosition, schemaId);
            if (this.scripting != null) {
                this.scripting.invoke(ddl);
            }
            this.producer.push(ddl);
        }
        this.tableCache.clear();
    }

    private void processQueryEvent(BinlogConnectorEvent event) throws Exception {
        QueryEventData data = event.queryData();
        this.processQueryEvent(data.getDatabase(), data.getSql(), this.schemaStore, Position.valueOf(event.getPosition(), this.getLastHeartbeatRead()), Position.valueOf(event.getNextPosition(), this.getLastHeartbeatRead()), event.getEvent().getHeader().getTimestamp());
    }

    private boolean shouldOutputEvent(String database, String table, Filter filter, Set<String> columnNames) {
        if (Filter.isSystemBlacklisted(database, table)) {
            return false;
        }
        if (filter.isSystemWhitelisted(database, table)) {
            return true;
        }
        if (Filter.includes(filter, database, table)) {
            return true;
        }
        return Filter.couldIncludeFromColumnFilters(filter, database, table, columnNames);
    }

    private boolean shouldOutputRowMap(String database, String table, RowMap rowMap, Filter filter) {
        return filter.isSystemWhitelisted(database, table) || filter.includes(database, table, rowMap.getData());
    }

    protected boolean isMaxwellRow(RowMap row) {
        return row.getDatabase().equals(this.maxwellSchemaDatabaseName);
    }

    private boolean isBootstrapInsert(RowMap row) {
        return row.getDatabase().equals(this.maxwellSchemaDatabaseName) && row.getRowType().equals("insert") && row.getTable().equals("bootstrap");
    }

    private void ensureReplicatorThread() throws Exception {
        this.checkCommErrors();
        if (!this.isConnected && this.stopOnEOF) {
            return;
        }
        if (!this.isConnectionAlive()) {
            this.client.disconnect();
            if (this.gtidPositioning.booleanValue()) {
                LOGGER.warn("replicator stopped at position: {} -- restarting", (Object)this.client.getGtidSet());
                this.client.setBinlogFilename("");
                this.client.setBinlogPosition(4L);
                this.tryReconnect();
                throw new ClientReconnectedException();
            }
            LOGGER.warn("replicator stopped at position: {} -- restarting", (Object)(this.client.getBinlogFilename() + ":" + this.client.getBinlogPosition()));
            Long oldMasterId = this.client.getMasterServerId();
            this.tryReconnect();
            if (this.client.getMasterServerId() != oldMasterId.longValue()) {
                throw new Exception("Master id changed from " + oldMasterId + " to " + this.client.getMasterServerId() + " while using binlog coordinate positioning. Cannot continue with the info that we have");
            }
        }
    }

    private void tryReconnect() throws TimeoutException {
        int reconnectionAttempts = 0;
        while (++reconnectionAttempts <= this.replicationReconnectionRetries || this.replicationReconnectionRetries == 0) {
            try {
                LOGGER.info(String.format("Reconnection attempt: %s of %s", reconnectionAttempts, this.replicationReconnectionRetries > 0 ? Integer.valueOf(this.replicationReconnectionRetries) : "unlimited"));
                this.client.connect(5000L);
                return;
            }
            catch (IOException | TimeoutException exception) {
            }
        }
        throw new TimeoutException("Maximum reconnection attempts reached.");
    }

    private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception {
        RowMapBuffer buffer = new RowMapBuffer(10000L, this.bufferMemoryUsage);
        String currentQuery = null;
        block9: while (true) {
            BinlogConnectorEvent event;
            if ((event = this.pollEvent()) == null) {
                this.ensureReplicatorThread();
                continue;
            }
            EventType eventType = event.getEvent().getHeader().getEventType();
            if (event.isCommitEvent()) {
                if (!buffer.isEmpty()) {
                    ((RowMap)buffer.getLast()).setTXCommit();
                    long timeSpent = ((RowMap)buffer.getLast()).getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp();
                    this.transactionExecutionTime.update(timeSpent);
                    this.transactionRowCount.update(buffer.size().longValue());
                }
                if (eventType == EventType.XID) {
                    buffer.setXid(event.xidData().getXid());
                }
                return buffer;
            }
            switch (eventType) {
                case WRITE_ROWS: 
                case UPDATE_ROWS: 
                case DELETE_ROWS: 
                case EXT_WRITE_ROWS: 
                case EXT_UPDATE_ROWS: 
                case EXT_DELETE_ROWS: {
                    List<RowMap> rows;
                    Table table = this.tableCache.getTable(event.getTableID());
                    if (table == null || !this.shouldOutputEvent(table.getDatabase(), table.getName(), this.filter, table.getColumnNames())) continue block9;
                    try {
                        rows = event.jsonMaps(table, this.getLastHeartbeatRead(), currentQuery);
                    }
                    catch (ColumnDefCastException e) {
                        this.logColumnDefCastException(table, e);
                        throw e;
                    }
                    Iterator<RowMap> e = rows.iterator();
                    while (true) {
                        if (!e.hasNext()) continue block9;
                        RowMap r = e.next();
                        if (!this.shouldOutputRowMap(table.getDatabase(), table.getName(), r, this.filter)) continue;
                        buffer.add(r);
                    }
                }
                case TABLE_MAP: {
                    TableMapEventData data = event.tableMapData();
                    this.tableCache.processEvent(this.getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
                    break;
                }
                case ROWS_QUERY: {
                    RowsQueryEventData rqed = (RowsQueryEventData)event.getEvent().getData();
                    currentQuery = rqed.getQuery();
                    break;
                }
                case ANNOTATE_ROWS: {
                    AnnotateRowsEventData ared = (AnnotateRowsEventData)event.getEvent().getData();
                    currentQuery = ared.getRowsQuery();
                    break;
                }
                case QUERY: {
                    QueryEventData qe = event.queryData();
                    String sql = qe.getSql();
                    String upperCaseSql = sql.toUpperCase();
                    if (upperCaseSql.startsWith("SAVEPOINT")) {
                        LOGGER.debug("Ignoring SAVEPOINT in transaction: {}", (Object)qe);
                        break;
                    }
                    if (createTablePattern.matcher(sql).find()) {
                        this.processQueryEvent(event);
                        break;
                    }
                    if (upperCaseSql.startsWith("INSERT INTO MYSQL.RDS_") || upperCaseSql.startsWith("DELETE FROM MYSQL.RDS_") || upperCaseSql.startsWith("DROP TEMPORARY TABLE") || upperCaseSql.startsWith("# DUMMY EVENT")) continue block9;
                    if (upperCaseSql.equals("ROLLBACK")) {
                        LOGGER.debug("rolling back transaction inside binlog.");
                        return new RowMapBuffer(0L);
                    }
                    LOGGER.warn("Unhandled QueryEvent @ {} inside transaction: {}", (Object)event.getPosition().fullPosition(), (Object)qe);
                    break;
                }
            }
        }
    }

    private void logColumnDefCastException(Table table, ColumnDefCastException e) {
        String castInfo = String.format("Unable to cast %s (%s) into column %s.%s.%s (type '%s')", e.givenValue.toString(), e.givenValue.getClass().getName(), table.getDatabase(), table.getName(), e.def.getName(), e.def.getType());
        LOGGER.error(castInfo);
        e.database = table.getDatabase();
        e.table = table.getName();
    }

    @Override
    public RowMap getRow() throws Exception {
        if (this.stopOnEOF && this.hitEOF) {
            return null;
        }
        if (!this.replicatorStarted) {
            LOGGER.warn("replicator was not started, calling startReplicator()...");
            this.startReplicator();
        }
        block13: while (true) {
            if (this.rowBuffer != null && !this.rowBuffer.isEmpty()) {
                RowMap row = this.rowBuffer.removeFirst();
                if (row != null && this.isMaxwellRow(row) && row.getTable().equals("heartbeats")) {
                    return this.processHeartbeats(row);
                }
                return row;
            }
            BinlogConnectorEvent event = this.pollEvent();
            if (event == null) {
                if (this.stopOnEOF) {
                    if (this.isConnected) continue;
                    return null;
                }
                try {
                    this.ensureReplicatorThread();
                }
                catch (ClientReconnectedException row) {
                    // empty catch block
                }
                return null;
            }
            switch (event.getType()) {
                case WRITE_ROWS: 
                case UPDATE_ROWS: 
                case DELETE_ROWS: 
                case EXT_WRITE_ROWS: 
                case EXT_UPDATE_ROWS: 
                case EXT_DELETE_ROWS: {
                    LOGGER.warn("Started replication stream inside a transaction.  This shouldn't normally happen.");
                    LOGGER.warn("Assuming new transaction at unexpected event:" + event);
                    this.queue.offerFirst(event);
                    this.rowBuffer = this.getTransactionRows(event);
                    continue block13;
                }
                case TABLE_MAP: {
                    TableMapEventData data = event.tableMapData();
                    this.tableCache.processEvent(this.getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
                    continue block13;
                }
                case QUERY: {
                    QueryEventData qe = event.queryData();
                    String sql = qe.getSql();
                    if ("BEGIN".equals(sql)) {
                        try {
                            this.rowBuffer = this.getTransactionRows(event);
                        }
                        catch (ClientReconnectedException e) {
                            this.rowBuffer = null;
                            continue block13;
                        }
                        this.rowBuffer.setServerId(event.getEvent().getHeader().getServerId());
                        this.rowBuffer.setThreadId(qe.getThreadId());
                        this.rowBuffer.setSchemaId(this.getSchemaId());
                        continue block13;
                    }
                    this.processQueryEvent(event);
                    continue block13;
                }
                case MARIADB_GTID: {
                    MariadbGtidEventData g = event.mariaGtidData();
                    if ((g.getFlags() & MariadbGtidEventData.FL_STANDALONE) != 0) continue block13;
                    try {
                        this.rowBuffer = this.getTransactionRows(event);
                    }
                    catch (ClientReconnectedException e) {
                        this.rowBuffer = null;
                        continue block13;
                    }
                    this.rowBuffer.setServerId(event.getEvent().getHeader().getServerId());
                    this.rowBuffer.setSchemaId(this.getSchemaId());
                    continue block13;
                }
                case ROTATE: {
                    this.tableCache.clear();
                    if (!this.stopOnEOF || event.getPosition().getOffset() <= 0L) continue block13;
                    this.binlogEventListener.mustStop.set(true);
                    this.client.disconnect();
                    this.hitEOF = true;
                    return null;
                }
            }
        }
    }

    protected BinlogConnectorEvent pollEvent() throws InterruptedException {
        return this.queue.poll(100L, TimeUnit.MILLISECONDS);
    }

    @Override
    public Schema getSchema() throws SchemaStoreException {
        return this.schemaStore.getSchema();
    }

    @Override
    public Long getSchemaId() throws SchemaStoreException {
        return this.schemaStore.getSchemaID();
    }

    public void onConnect(BinaryLogClient client) {
        LOGGER.info("Binlog connected.");
        this.isConnected = true;
    }

    public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
        LOGGER.warn("Event deserialization failure.", (Throwable)ex);
        LOGGER.warn("cause: ", ex.getCause());
    }

    public void onDisconnect(BinaryLogClient client) {
        LOGGER.info("Binlog disconnected.");
        this.isConnected = false;
    }

    private class ClientReconnectedException
    extends Exception {
        private ClientReconnectedException() {
        }
    }
}

