/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql.legacy;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.EventDataDeserializationExceptionData;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.HaltingPredicate;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.RowDeserializers;
import io.debezium.connector.mysql.StopEventDataDeserializer;
import io.debezium.connector.mysql.legacy.AbstractReader;
import io.debezium.connector.mysql.legacy.BinlogReaderMetrics;
import io.debezium.connector.mysql.legacy.EventBuffer;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext;
import io.debezium.connector.mysql.legacy.MySqlTaskContext;
import io.debezium.connector.mysql.legacy.RecordMakers;
import io.debezium.connector.mysql.legacy.SourceInfo;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.event.Level;

public class BinlogReader
extends AbstractReader {
    private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5L);
    private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1L);
    private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
    private final boolean recordSchemaChangesInSourceRecords;
    private final RecordMakers recordMakers;
    private final SourceInfo source;
    private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap(EventType.class);
    private final BinaryLogClient client;
    private final BinlogReaderMetrics metrics;
    private final Clock clock;
    private final ElapsedTimeStrategy pollOutputDelay;
    private final CommonConnectorConfig.EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
    private final CommonConnectorConfig.EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
    private int startingRowNumber = 0;
    private long recordCounter = 0L;
    private long previousOutputMillis = 0L;
    private long initialEventsToSkip = 0L;
    private boolean skipEvent = false;
    private boolean ignoreDmlEventByGtidSource = false;
    private final Predicate<String> gtidDmlSourceFilter;
    private final AtomicLong totalRecordCounter = new AtomicLong();
    private volatile Map<String, ?> lastOffset = null;
    private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
    private Heartbeat heartbeat;
    private MySqlJdbcContext connectionContext;
    private final float heartbeatIntervalFactor = 0.8f;
    private final Map<String, Thread> binaryLogClientThreads = new ConcurrentHashMap<String, Thread>(4);

    public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {
        this(name, context, acceptAndContinue, context.serverId());
    }

    public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) {
        super(name, context, acceptAndContinue);
        boolean filterDmlEventsByGtidSource;
        SSLSocketFactory sslSocketFactory;
        this.connectionContext = context.getConnectionContext();
        this.source = context.source();
        this.recordMakers = context.makeRecord();
        this.recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();
        this.clock = context.getClock();
        this.eventDeserializationFailureHandlingMode = this.connectionContext.eventProcessingFailureHandlingMode();
        this.inconsistentSchemaHandlingMode = this.connectionContext.inconsistentSchemaHandlingMode();
        this.pollOutputDelay = ElapsedTimeStrategy.exponential(this.clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
        this.client = new BinaryLogClient(this.connectionContext.hostname(), this.connectionContext.port(), this.connectionContext.username(), this.connectionContext.password());
        this.client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false, false, x -> this.binaryLogClientThreads.put(x.getName(), (Thread)x)));
        this.client.setServerId(serverId);
        this.client.setSSLMode(BinlogReader.sslModeFor(this.connectionContext.sslMode()));
        if (this.connectionContext.sslModeEnabled() && (sslSocketFactory = this.getBinlogSslSocketFactory(this.connectionContext)) != null) {
            this.client.setSslSocketFactory(sslSocketFactory);
        }
        Configuration configuration = context.config();
        this.client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
        long keepAliveInterval = configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
        this.client.setKeepAliveInterval(keepAliveInterval);
        this.client.setHeartbeatInterval((long)((float)keepAliveInterval * 0.8f));
        this.client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : new EventBuffer(context.bufferSizeForBinlogReader(), this)::add);
        this.client.registerLifecycleListener(new ReaderThreadLifecycleListener());
        this.client.registerEventListener(this::onEvent);
        if (this.logger.isDebugEnabled()) {
            this.client.registerEventListener(this::logEvent);
        }
        this.gtidDmlSourceFilter = (filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS)) ? context.gtidSourceFilter() : null;
        final HashMap<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
        EventDeserializer eventDeserializer = new EventDeserializer(){

            @Override
            public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
                try {
                    Event event = super.nextEvent(inputStream);
                    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
                        TableMapEventData tableMapEvent = (TableMapEventData)event.getData();
                        tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
                    }
                    return event;
                }
                catch (EventDataDeserializationException edde) {
                    if (edde.getCause() instanceof IOException) {
                        throw edde;
                    }
                    EventHeaderV4 header = new EventHeaderV4();
                    header.setEventType(EventType.INCIDENT);
                    header.setTimestamp(edde.getEventHeader().getTimestamp());
                    header.setServerId(edde.getEventHeader().getServerId());
                    if (edde.getEventHeader() instanceof EventHeaderV4) {
                        header.setEventLength(((EventHeaderV4)edde.getEventHeader()).getEventLength());
                        header.setNextPosition(((EventHeaderV4)edde.getEventHeader()).getNextPosition());
                        header.setFlags(((EventHeaderV4)edde.getEventHeader()).getFlags());
                    }
                    EventDataDeserializationExceptionData data = new EventDataDeserializationExceptionData(edde);
                    return new Event(header, data);
                }
            }
        };
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
        eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));
        eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));
        eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));
        this.client.setEventDeserializer(eventDeserializer);
        this.metrics = new BinlogReaderMetrics(this.client, context, name, this.changeEventQueueMetrics);
        this.heartbeat = Heartbeat.create(configuration.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS), context.topicSelector().getHeartbeatTopic(), context.getConnectorConfig().getLogicalName());
    }

    @Override
    protected void doInitialize() {
        this.metrics.register(this.logger);
    }

    @Override
    public void doDestroy() {
        this.metrics.unregister(this.logger);
    }

    @Override
    protected void doStart() {
        this.context.dbSchema().assureNonEmptySchema();
        EnumSet<Envelope.Operation> skippedOperations = this.context.getConnectorConfig().getSkippedOperations();
        this.eventHandlers.put(EventType.STOP, this::handleServerStop);
        this.eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
        this.eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
        this.eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
        this.eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
        this.eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
        if (!skippedOperations.contains((Object)Envelope.Operation.CREATE)) {
            this.eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
            this.eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
        }
        if (!skippedOperations.contains((Object)Envelope.Operation.UPDATE)) {
            this.eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
            this.eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
        }
        if (!skippedOperations.contains((Object)Envelope.Operation.DELETE)) {
            this.eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
            this.eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
        }
        this.eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
        this.eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
        this.eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
        if (this.context.includeSqlQuery()) {
            this.eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
        }
        boolean isGtidModeEnabled = this.connectionContext.isGtidModeEnabled();
        this.metrics.setIsGtidModeEnabled(isGtidModeEnabled);
        String availableServerGtidStr = this.connectionContext.knownGtidSet();
        if (isGtidModeEnabled) {
            this.eventHandlers.put(EventType.GTID, this::handleGtidEvent);
            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
            GtidSet purgedServerGtidSet = this.connectionContext.purgedGtidSet();
            this.logger.info("GTID set purged on server: {}", (Object)purgedServerGtidSet);
            GtidSet filteredGtidSet = this.context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
            if (filteredGtidSet != null) {
                this.logger.info("Registering binlog reader with GTID set: {}", (Object)filteredGtidSet);
                String filteredGtidSetStr = filteredGtidSet.toString();
                this.client.setGtidSet(filteredGtidSetStr);
                this.source.setCompletedGtidSet(filteredGtidSetStr);
                this.gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
            } else {
                this.client.setBinlogFilename(this.source.binlogFilename());
                this.client.setBinlogPosition(this.source.binlogPosition());
                this.gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
            }
        } else {
            this.client.setBinlogFilename(this.source.binlogFilename());
            this.client.setBinlogPosition(this.source.binlogPosition());
        }
        this.initialEventsToSkip = this.source.eventsToSkipUponRestart();
        this.startingRowNumber = this.source.rowsToSkipUponRestart();
        this.skipEvent = false;
        this.pollOutputDelay.hasElapsed();
        this.previousOutputMillis = this.clock.currentTimeInMillis();
        if (this.isRunning()) {
            long timeout = this.context.getConnectorConfig().getConnectionTimeout().toMillis();
            long started = this.context.getClock().currentTimeInMillis();
            try {
                this.logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", (Object)timeout);
                this.client.connect(timeout);
                if (this.client.isKeepAlive()) {
                    this.logger.info("Waiting for keepalive thread to start");
                    Metronome metronome = Metronome.parker(Duration.ofMillis(100L), this.clock);
                    int waitAttempts = 50;
                    boolean keepAliveThreadRunning = false;
                    while (!keepAliveThreadRunning && waitAttempts-- > 0) {
                        for (Thread t : this.binaryLogClientThreads.values()) {
                            if (!t.getName().startsWith(KEEPALIVE_THREAD_NAME) || !t.isAlive()) continue;
                            this.logger.info("Keepalive thread is running");
                            keepAliveThreadRunning = true;
                        }
                        metronome.pause();
                    }
                }
            }
            catch (TimeoutException e) {
                long duration = this.context.getClock().currentTimeInMillis() - started;
                if ((double)duration > 0.9 * (double)timeout) {
                    double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
                    throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " + this.connectionContext.hostname() + ":" + this.connectionContext.port() + " with user '" + this.connectionContext.username() + "'", e);
                }
            }
            catch (AuthenticationException e) {
                throw new ConnectException("Failed to authenticate to the MySQL database at " + this.connectionContext.hostname() + ":" + this.connectionContext.port() + " with user '" + this.connectionContext.username() + "'", e);
            }
            catch (Throwable e) {
                throw new ConnectException("Unable to connect to the MySQL database at " + this.connectionContext.hostname() + ":" + this.connectionContext.port() + " with user '" + this.connectionContext.username() + "': " + e.getMessage(), e);
            }
        }
    }

    protected void rewindBinaryLogClient(BinlogPosition position) {
        try {
            if (this.isRunning()) {
                this.logger.debug("Rewinding binlog to position {}", (Object)position);
                this.client.disconnect();
                this.client.setBinlogFilename(position.getFilename());
                this.client.setBinlogPosition(position.getPosition());
                this.client.connect();
            }
        }
        catch (IOException e) {
            this.logger.error("Unexpected error when re-connecting to the MySQL binary log reader", (Throwable)e);
        }
    }

    public Map<String, ?> getLastOffset() {
        return this.lastOffset == null ? null : new HashMap(this.lastOffset);
    }

    @Override
    protected void doStop() {
        try {
            if (this.client.isConnected()) {
                this.logger.debug("Stopping binlog reader '{}', last recorded offset: {}", (Object)this.name(), this.lastOffset);
                this.client.disconnect();
            }
            this.cleanupResources();
        }
        catch (IOException e) {
            this.logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", (Object)this.name(), (Object)e);
        }
    }

    @Override
    protected void doCleanup() {
        this.logger.debug("Completed writing all records that were read from the binlog before being stopped");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void pollComplete(List<SourceRecord> batch) {
        int batchSize = batch.size();
        this.recordCounter += (long)batchSize;
        this.totalRecordCounter.addAndGet(batchSize);
        if (batchSize > 0) {
            SourceRecord lastRecord = batch.get(batchSize - 1);
            this.lastOffset = lastRecord.sourceOffset();
            if (this.pollOutputDelay.hasElapsed()) {
                long millisSinceLastOutput = this.clock.currentTimeInMillis() - this.previousOutputMillis;
                try {
                    if (this.logger.isInfoEnabled()) {
                        this.context.temporaryLoggingContext("binlog", () -> this.logger.info("{} records sent during previous {}, last recorded offset: {}", new Object[]{this.recordCounter, Strings.duration(millisSinceLastOutput), this.lastOffset}));
                    }
                }
                finally {
                    this.recordCounter = 0L;
                    this.previousOutputMillis += millisSinceLastOutput;
                }
            }
        }
    }

    protected void logEvent(Event event) {
        this.logger.trace("Received event: {}", (Object)event);
    }

    protected void onEvent(Event event) {
        long ts = 0L;
        if (event.getHeader().getEventType() == EventType.HEARTBEAT) {
            this.metrics.setMilliSecondsBehindSource(ts);
            return;
        }
        long eventTs = event.getHeader().getTimestamp();
        if (eventTs == 0L) {
            this.logger.trace("Received unexpected event with 0 timestamp: {}", (Object)event);
            return;
        }
        ts = this.clock.currentTimeInMillis() - eventTs;
        this.logger.trace("Current milliseconds behind source: {} ms", (Object)ts);
        this.metrics.setMilliSecondsBehindSource(ts);
    }

    protected void ignoreEvent(Event event) {
        this.logger.trace("Ignoring event due to missing handler: {}", (Object)event);
    }

    protected void handleEvent(Event event) {
        if (event == null) {
            return;
        }
        Object eventHeader = event.getHeader();
        if (!eventHeader.getEventType().equals((Object)EventType.HEARTBEAT)) {
            this.source.setBinlogTimestampSeconds(eventHeader.getTimestamp() / 1000L);
        }
        this.source.setBinlogServerId(eventHeader.getServerId());
        EventType eventType = eventHeader.getEventType();
        if (eventType == EventType.ROTATE) {
            Object eventData = event.getData();
            RotateEventData rotateEventData = eventData instanceof EventDeserializer.EventDataWrapper ? (RotateEventData)((EventDeserializer.EventDataWrapper)eventData).getInternal() : (RotateEventData)eventData;
            this.source.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
        } else if (eventHeader instanceof EventHeaderV4) {
            EventHeaderV4 trackableEventHeader = (EventHeaderV4)eventHeader;
            this.source.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());
        }
        try {
            this.eventHandlers.getOrDefault((Object)eventType, this::ignoreEvent).accept(event);
            this.heartbeat.heartbeat(this.source.partition(), this.source.offset(), this::enqueueRecord);
            this.source.completeEvent();
            if (this.skipEvent) {
                --this.initialEventsToSkip;
                this.skipEvent = this.initialEventsToSkip > 0L;
            }
        }
        catch (RuntimeException e) {
            this.logReaderState();
            this.failed(e, "Error processing binlog event");
            this.eventHandlers.clear();
            this.logger.info("Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.eventHandlers.clear();
            this.logger.info("Stopped processing binlog events due to thread interruption");
        }
    }

    protected <T extends EventData> T unwrapData(Event event) {
        Object eventData = event.getData();
        if (eventData instanceof EventDeserializer.EventDataWrapper) {
            eventData = ((EventDeserializer.EventDataWrapper)eventData).getInternal();
        }
        return eventData;
    }

    protected void handleServerStop(Event event) {
        this.logger.debug("Server stopped: {}", (Object)event);
    }

    protected void handleServerHeartbeat(Event event) {
        this.logger.trace("Server heartbeat: {}", (Object)event);
    }

    protected void handleServerIncident(Event event) {
        if (event.getData() instanceof EventDataDeserializationExceptionData) {
            this.metrics.onErroneousEvent("source = " + event.toString());
            EventDataDeserializationExceptionData data = (EventDataDeserializationExceptionData)event.getData();
            EventHeaderV4 eventHeader = (EventHeaderV4)data.getCause().getEventHeader();
            if (this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL) {
                this.logger.error("Error while deserializing binlog event at offset {}.{}Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", new Object[]{this.source.offset(), System.lineSeparator(), eventHeader.getPosition(), eventHeader.getNextPosition(), this.source.binlogFilename()});
                throw new RuntimeException(data.getCause());
            }
            if (this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN) {
                this.logger.warn("Error while deserializing binlog event at offset {}.{}This exception will be ignored and the event be skipped.{}Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", new Object[]{this.source.offset(), System.lineSeparator(), System.lineSeparator(), eventHeader.getPosition(), eventHeader.getNextPosition(), this.source.binlogFilename(), data.getCause()});
            }
        } else {
            this.logger.error("Server incident: {}", (Object)event);
        }
    }

    protected void handleRotateLogsEvent(Event event) {
        this.logger.debug("Rotating logs: {}", (Object)event);
        RotateEventData command = (RotateEventData)this.unwrapData(event);
        assert (command != null);
        this.recordMakers.clear();
    }

    protected void handleGtidEvent(Event event) {
        String uuid;
        this.logger.debug("GTID transaction: {}", (Object)event);
        GtidEventData gtidEvent = (GtidEventData)this.unwrapData(event);
        String gtid = gtidEvent.getGtid();
        this.gtidSet.add(gtid);
        this.source.startGtid(gtid, this.gtidSet.toString());
        this.ignoreDmlEventByGtidSource = false;
        if (this.gtidDmlSourceFilter != null && gtid != null && !this.gtidDmlSourceFilter.test(uuid = gtid.trim().substring(0, gtid.indexOf(":")))) {
            this.ignoreDmlEventByGtidSource = true;
        }
        this.metrics.onGtidChange(gtid);
    }

    protected void handleRowsQuery(Event event) {
        RowsQueryEventData lastRowsQueryEventData = (RowsQueryEventData)this.unwrapData(event);
        this.source.setQuery(lastRowsQueryEventData.getQuery());
    }

    protected void handleQueryEvent(Event event) throws InterruptedException {
        QueryEventData command = (QueryEventData)this.unwrapData(event);
        this.logger.debug("Received query command: {}", (Object)event);
        String sql = command.getSql().trim();
        if (sql.equalsIgnoreCase("BEGIN")) {
            this.source.startNextTransaction();
            this.source.setBinlogThread(command.getThreadId());
            if (this.initialEventsToSkip != 0L) {
                this.logger.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event", (Object)this.initialEventsToSkip, (Object)this.startingRowNumber);
                this.skipEvent = true;
            }
            return;
        }
        if (sql.equalsIgnoreCase("COMMIT")) {
            this.handleTransactionCompletion(event);
            return;
        }
        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
        if (upperCasedStatementBegin.startsWith("XA ")) {
            return;
        }
        if (this.context.ddlFilter().test(sql)) {
            this.logger.debug("DDL '{}' was filtered out of processing", (Object)sql);
            return;
        }
        if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ") || upperCasedStatementBegin.equals("DELETE ")) {
            if (this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL) {
                throw new ConnectException("Received DML '" + sql + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
            }
            if (this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN) {
                this.logger.warn("Warning only: Received DML '" + sql + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
                return;
            }
            return;
        }
        if (sql.equalsIgnoreCase("ROLLBACK")) {
            this.logger.warn("Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering", (Object)MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
        }
        this.context.dbSchema().applyDdl(this.context.source(), command.getDatabase(), command.getSql(), (dbName, tables, statements) -> {
            if (this.recordSchemaChangesInSourceRecords && this.recordMakers.schemaChanges(dbName, tables, statements, x$0 -> super.enqueueRecord((SourceRecord)x$0)) > 0) {
                this.logger.debug("Recorded DDL statements for database '{}': {}", (Object)dbName, (Object)statements);
            }
        });
    }

    private void handleTransactionCompletion(Event event) {
        this.source.commitTransaction();
        this.source.setBinlogThread(-1L);
        this.skipEvent = false;
        this.ignoreDmlEventByGtidSource = false;
    }

    protected void handleUpdateTableMetadata(Event event) {
        String tableName;
        String databaseName;
        TableId tableId;
        TableMapEventData metadata = (TableMapEventData)this.unwrapData(event);
        long tableNumber = metadata.getTableId();
        if (this.recordMakers.assign(tableNumber, tableId = new TableId(databaseName = metadata.getDatabase(), null, tableName = metadata.getTable()))) {
            this.logger.debug("Received update table metadata event: {}", (Object)event);
        } else {
            this.informAboutUnknownTableIfRequired(event, tableId, "update table metadata");
        }
    }

    private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
        if (tableId != null && this.context.dbSchema().isTableCaptured(tableId)) {
            this.metrics.onErroneousEvent("source = " + tableId + ", event " + event);
            EventHeaderV4 eventHeader = (EventHeaderV4)event.getHeader();
            if (this.inconsistentSchemaHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL) {
                this.logger.error("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", new Object[]{event, this.source.offset(), tableId, System.lineSeparator(), eventHeader.getPosition(), eventHeader.getNextPosition(), this.source.binlogFilename()});
                throw new ConnectException("Encountered change event for table " + tableId + " whose schema isn't known to this connector");
            }
            if (this.inconsistentSchemaHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN) {
                this.logger.warn("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}The event will be ignored.{}Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", new Object[]{event, this.source.offset(), tableId, System.lineSeparator(), System.lineSeparator(), eventHeader.getPosition(), eventHeader.getNextPosition(), this.source.binlogFilename()});
            } else {
                this.logger.debug("Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}The event will be ignored.{}Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", new Object[]{event, this.source.offset(), tableId, System.lineSeparator(), System.lineSeparator(), eventHeader.getPosition(), eventHeader.getNextPosition(), this.source.binlogFilename()});
            }
        } else {
            this.logger.debug("Filtering {} event: {} for non-monitored table {}", new Object[]{typeToLog, event, tableId});
            this.metrics.onFilteredEvent("source = " + tableId);
        }
    }

    protected void handleInsert(Event event) throws InterruptedException {
        BitSet includedColumns;
        if (this.skipEvent) {
            this.logger.debug("Skipping previously processed row event: {}", (Object)event);
            return;
        }
        if (this.ignoreDmlEventByGtidSource) {
            this.logger.debug("Skipping DML event because this GTID source is filtered: {}", (Object)event);
            return;
        }
        WriteRowsEventData write = (WriteRowsEventData)this.unwrapData(event);
        long tableNumber = write.getTableId();
        RecordMakers.RecordsForTable recordMaker = this.recordMakers.forTable(tableNumber, includedColumns = write.getIncludedColumns(), x$0 -> super.enqueueRecord((SourceRecord)x$0));
        if (recordMaker != null) {
            List<Serializable[]> rows = write.getRows();
            Instant ts = this.context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (this.startingRowNumber < numRows) {
                for (int row = this.startingRowNumber; row != numRows; ++row) {
                    count += recordMaker.create(rows.get(row), ts, row, numRows);
                }
                if (this.logger.isDebugEnabled()) {
                    if (this.startingRowNumber != 0) {
                        this.logger.debug("Recorded {} insert record(s) for last {} row(s) in event: {}", new Object[]{count, numRows - this.startingRowNumber, event});
                    } else {
                        this.logger.debug("Recorded {} insert record(s) for event: {}", (Object)count, (Object)event);
                    }
                }
            } else {
                this.logger.debug("Skipping previously processed insert event: {}", (Object)event);
            }
        } else {
            this.informAboutUnknownTableIfRequired(event, this.recordMakers.getTableIdFromTableNumber(tableNumber), "insert row");
        }
        this.startingRowNumber = 0;
    }

    protected void handleUpdate(Event event) throws InterruptedException {
        BitSet includedColumns;
        if (this.skipEvent) {
            this.logger.debug("Skipping previously processed row event: {}", (Object)event);
            return;
        }
        if (this.ignoreDmlEventByGtidSource) {
            this.logger.debug("Skipping DML event because this GTID source is filtered: {}", (Object)event);
            return;
        }
        UpdateRowsEventData update = (UpdateRowsEventData)this.unwrapData(event);
        long tableNumber = update.getTableId();
        RecordMakers.RecordsForTable recordMaker = this.recordMakers.forTable(tableNumber, includedColumns = update.getIncludedColumns(), x$0 -> super.enqueueRecord((SourceRecord)x$0));
        if (recordMaker != null) {
            List<Map.Entry<Serializable[], Serializable[]>> rows = update.getRows();
            Instant ts = this.context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (this.startingRowNumber < numRows) {
                for (int row = this.startingRowNumber; row != numRows; ++row) {
                    Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
                    Object[] before = changes.getKey();
                    Object[] after = changes.getValue();
                    count += recordMaker.update(before, after, ts, row, numRows);
                }
                if (this.logger.isDebugEnabled()) {
                    if (this.startingRowNumber != 0) {
                        this.logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}", new Object[]{count, numRows - this.startingRowNumber, event});
                    } else {
                        this.logger.debug("Recorded {} update record(s) for event: {}", (Object)count, (Object)event);
                    }
                }
            } else {
                this.logger.debug("Skipping previously processed update event: {}", (Object)event);
            }
        } else {
            this.informAboutUnknownTableIfRequired(event, this.recordMakers.getTableIdFromTableNumber(tableNumber), "update row");
        }
        this.startingRowNumber = 0;
    }

    protected void handleDelete(Event event) throws InterruptedException {
        BitSet includedColumns;
        if (this.skipEvent) {
            this.logger.debug("Skipping previously processed row event: {}", (Object)event);
            return;
        }
        if (this.ignoreDmlEventByGtidSource) {
            this.logger.debug("Skipping DML event because this GTID source is filtered: {}", (Object)event);
            return;
        }
        DeleteRowsEventData deleted = (DeleteRowsEventData)this.unwrapData(event);
        long tableNumber = deleted.getTableId();
        RecordMakers.RecordsForTable recordMaker = this.recordMakers.forTable(tableNumber, includedColumns = deleted.getIncludedColumns(), x$0 -> super.enqueueRecord((SourceRecord)x$0));
        if (recordMaker != null) {
            List<Serializable[]> rows = deleted.getRows();
            Instant ts = this.context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (this.startingRowNumber < numRows) {
                for (int row = this.startingRowNumber; row != numRows; ++row) {
                    count += recordMaker.delete(rows.get(row), ts, row, numRows);
                }
                if (this.logger.isDebugEnabled()) {
                    if (this.startingRowNumber != 0) {
                        this.logger.debug("Recorded {} delete record(s) for last {} row(s) in event: {}", new Object[]{count, numRows - this.startingRowNumber, event});
                    } else {
                        this.logger.debug("Recorded {} delete record(s) for event: {}", (Object)count, (Object)event);
                    }
                }
            } else {
                this.logger.debug("Skipping previously processed delete event: {}", (Object)event);
            }
        } else {
            this.informAboutUnknownTableIfRequired(event, this.recordMakers.getTableIdFromTableNumber(tableNumber), "delete row");
        }
        this.startingRowNumber = 0;
    }

    protected void viewChange(Event event) throws InterruptedException {
        this.logger.debug("View Change event: {}", (Object)event);
    }

    protected void prepareTransaction(Event event) throws InterruptedException {
        this.logger.debug("XA Prepare event: {}", (Object)event);
    }

    protected static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) {
        switch (mode) {
            case DISABLED: {
                return SSLMode.DISABLED;
            }
            case PREFERRED: {
                return SSLMode.PREFERRED;
            }
            case REQUIRED: {
                return SSLMode.REQUIRED;
            }
            case VERIFY_CA: {
                return SSLMode.VERIFY_CA;
            }
            case VERIFY_IDENTITY: {
                return SSLMode.VERIFY_IDENTITY;
            }
        }
        return null;
    }

    private void logReaderState() {
        this.logReaderState(Level.ERROR);
    }

    private void logReaderState(Level severity) {
        String position = this.client == null ? "N/A" : this.client.getBinlogFilename() + "/" + this.client.getBinlogPosition();
        String message = "Error during binlog processing. Last offset stored = {}, binlog reader near position = {}";
        switch (severity) {
            case WARN: {
                this.logger.warn("Error during binlog processing. Last offset stored = {}, binlog reader near position = {}", this.lastOffset, (Object)position);
                break;
            }
            case DEBUG: {
                this.logger.debug("Error during binlog processing. Last offset stored = {}, binlog reader near position = {}", this.lastOffset, (Object)position);
                break;
            }
            default: {
                this.logger.error("Error during binlog processing. Last offset stored = {}, binlog reader near position = {}", this.lastOffset, (Object)position);
            }
        }
    }

    protected BinlogReaderMetrics getMetrics() {
        return this.metrics;
    }

    protected BinaryLogClient getBinlogClient() {
        return this.client;
    }

    public BinlogPosition getCurrentBinlogPosition() {
        return new BinlogPosition(this.client.getBinlogFilename(), this.client.getBinlogPosition());
    }

    private SSLSocketFactory getBinlogSslSocketFactory(MySqlJdbcContext connectionContext) {
        String acceptedTlsVersion = connectionContext.getSessionVariableForSslVersion();
        if (!Strings.isNullOrEmpty(acceptedTlsVersion)) {
            SSLMode sslMode = BinlogReader.sslModeFor(connectionContext.sslMode());
            String password = System.getProperty("javax.net.ssl.keyStorePassword");
            String keyFilename = System.getProperty("javax.net.ssl.keyStore");
            KeyManager[] keyManagers = null;
            if (keyFilename != null) {
                char[] passwordArray = password == null ? null : password.toCharArray();
                try {
                    KeyStore ks = KeyStore.getInstance("JKS");
                    ks.load(new FileInputStream(keyFilename), passwordArray);
                    KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
                    kmf.init(ks, passwordArray);
                    keyManagers = kmf.getKeyManagers();
                }
                catch (IOException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                    throw new ConnectException("Could not load keystore", e);
                }
            }
            if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
                final KeyManager[] finalKMS = keyManagers;
                return new DefaultSSLSocketFactory(acceptedTlsVersion){

                    @Override
                    protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
                        sc.init(finalKMS, new TrustManager[]{new X509TrustManager(){

                            @Override
                            public void checkClientTrusted(X509Certificate[] x509Certificates, String s2) throws CertificateException {
                            }

                            @Override
                            public void checkServerTrusted(X509Certificate[] x509Certificates, String s2) throws CertificateException {
                            }

                            @Override
                            public X509Certificate[] getAcceptedIssuers() {
                                return new X509Certificate[0];
                            }
                        }}, null);
                    }
                };
            }
            return new DefaultSSLSocketFactory(acceptedTlsVersion);
        }
        return null;
    }

    protected final class ReaderThreadLifecycleListener
    implements BinaryLogClient.LifecycleListener {
        protected ReaderThreadLifecycleListener() {
        }

        @Override
        public void onDisconnect(BinaryLogClient client) {
            if (BinlogReader.this.logger.isInfoEnabled()) {
                BinlogReader.this.context.temporaryLoggingContext("binlog", () -> {
                    Map offset = BinlogReader.this.lastOffset;
                    if (offset != null) {
                        BinlogReader.this.logger.info("Stopped reading binlog after {} events, last recorded offset: {}", (Object)BinlogReader.this.totalRecordCounter, (Object)offset);
                    } else {
                        BinlogReader.this.logger.info("Stopped reading binlog after {} events, no new offset was recorded", (Object)BinlogReader.this.totalRecordCounter);
                    }
                });
            }
        }

        @Override
        public void onConnect(BinaryLogClient client) {
            BinlogReader.this.context.configureLoggingContext("binlog");
            BinlogReader.this.logger.info("Connected to MySQL binlog at {}:{}, starting at {}", new Object[]{BinlogReader.this.connectionContext.hostname(), BinlogReader.this.connectionContext.port(), BinlogReader.this.source});
        }

        @Override
        public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
            BinlogReader.this.logger.debug("A communication failure event arrived", (Throwable)ex);
            BinlogReader.this.logReaderState();
            try {
                client.disconnect();
            }
            catch (Exception e) {
                BinlogReader.this.logger.debug("Exception while closing client", (Throwable)e);
            }
            BinlogReader.this.failed(ex);
        }

        @Override
        public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
            if (BinlogReader.this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL) {
                BinlogReader.this.logger.debug("A deserialization failure event arrived", (Throwable)ex);
                BinlogReader.this.logReaderState();
                BinlogReader.this.failed(ex);
            } else if (BinlogReader.this.eventDeserializationFailureHandlingMode == CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN) {
                BinlogReader.this.logger.warn("A deserialization failure event arrived", (Throwable)ex);
                BinlogReader.this.logReaderState(Level.WARN);
            } else {
                BinlogReader.this.logger.debug("A deserialization failure event arrived", (Throwable)ex);
                BinlogReader.this.logReaderState(Level.DEBUG);
            }
        }
    }

    public static class BinlogPosition {
        final String filename;
        final long position;

        public BinlogPosition(String filename, long position) {
            assert (filename != null);
            this.filename = filename;
            this.position = position;
        }

        public String getFilename() {
            return this.filename;
        }

        public long getPosition() {
            return this.position;
        }

        public String toString() {
            return this.filename + "/" + this.position;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + this.filename.hashCode();
            result = 31 * result + (int)(this.position ^ this.position >>> 32);
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            BinlogPosition other = (BinlogPosition)obj;
            if (!this.filename.equals(other.filename)) {
                return false;
            }
            return this.position == other.position;
        }
    }
}

