/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;

public abstract class AbstractSnapshotTest<T extends SourceConnector>
extends AbstractAsyncEngineConnectorTest {
    protected static final int ROW_COUNT = 1000;
    protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-schema-history-is.txt").toAbsolutePath();
    protected static final int PARTITION_NO = 0;
    protected static final String SERVER_NAME = "test_server";
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5;
    protected final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");

    protected abstract Class<T> connectorClass();

    protected abstract JdbcConnection databaseConnection();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract List<String> topicNames();

    protected abstract List<String> tableNames();

    protected abstract String signalTableName();

    protected String signalTableNameSanitized() {
        return this.signalTableName();
    }

    protected abstract Configuration.Builder config();

    protected abstract Configuration.Builder mutableConfig(boolean var1, boolean var2);

    protected abstract String connector();

    protected abstract String server();

    protected String task() {
        return null;
    }

    protected String database() {
        return null;
    }

    protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
    }

    protected String alterTableAddColumnStatement(String tableName) {
        return "ALTER TABLE " + tableName + " add col3 int default 0";
    }

    protected String alterTableDropColumnStatement(String tableName) {
        return "ALTER TABLE " + tableName + " drop column col3";
    }

    protected String tableDataCollectionId() {
        return this.tableName();
    }

    protected List<String> tableDataCollectionIds() {
        return this.tableNames();
    }

    protected void populateTable(JdbcConnection connection, String tableName) throws SQLException {
        connection.setAutoCommit(false);
        for (int i = 0; i < 1000; ++i) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName, connection.quotedColumnIdString(this.pkFieldName()), i + 1, i)});
        }
        connection.commit();
    }

    protected void populateTable(JdbcConnection connection) throws SQLException {
        this.populateTable(connection, this.tableName());
    }

    protected void populateTables(JdbcConnection connection) throws SQLException {
        for (String tableName : this.tableNames()) {
            this.populateTable(connection, tableName);
        }
    }

    protected void populateTable() throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populateTable(connection);
        }
    }

    protected void populateTable(String table) throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populateTable(connection, table);
        }
    }

    protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populateTableWithSpecificValue(connection, this.tableName(), startRow, count, value);
        }
    }

    private void populateTableWithSpecificValue(JdbcConnection connection, String tableName, int startRow, int count, int value) throws SQLException {
        connection.setAutoCommit(false);
        for (int i = startRow + 1; i <= startRow + count; ++i) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName, connection.quotedColumnIdString(this.pkFieldName()), count + i, value)});
        }
        connection.commit();
    }

    protected void populateTables() throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populateTables(connection);
        }
    }

    protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
        connection.setAutoCommit(false);
        for (int i = 0; i < 1000; ++i) {
            int id = i + 1;
            int pk1 = id / 1000;
            int pk2 = id / 100 % 10;
            int pk3 = id / 10 % 10;
            int pk4 = id % 10;
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", tableName, pk1, pk2, pk3, pk4, i)});
        }
        connection.commit();
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, this.topicName());
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), x -> true, null, topicName);
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter, Predicate<Map.Entry<Integer, V>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(this.pkFieldName()), valueConverter, topicName, recordConsumer);
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, V>> dataCompleted, Function<Struct, Integer> idCalculator, Function<SourceRecord, V> valueConverter, String topicName, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, idCalculator, valueConverter, topicName, recordConsumer, true);
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, V>> dataCompleted, Function<Struct, Integer> idCalculator, Function<SourceRecord, V> valueConverter, String topicName, Consumer<List<SourceRecord>> recordConsumer, boolean assertRecords) throws InterruptedException {
        HashMap dbChanges = new HashMap();
        int noRecords = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1, assertRecords);
            List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
            if (records.allRecordsInOrder().isEmpty()) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)(++noRecords)).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount), new Object[0])).isLessThanOrEqualTo(5);
                continue;
            }
            noRecords = 0;
            if (dataRecords == null || dataRecords.isEmpty()) continue;
            dataRecords.forEach(record -> {
                int id = (Integer)idCalculator.apply((Struct)record.key());
                Object value = valueConverter.apply((SourceRecord)record);
                dbChanges.put(id, value);
            });
            if (recordConsumer != null) {
                recordConsumer.accept(dataRecords);
            }
            if (dbChanges.size() >= recordCount && !dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) break;
        }
        Assertions.assertThat(dbChanges).hasSize(recordCount);
        return dbChanges;
    }

    protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, this.topicName());
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), dataCompleted, recordConsumer, this.topicName());
    }

    protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, SourceRecord>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, this.topicName());
    }

    protected String valueFieldName() {
        return "aa";
    }

    protected String pkFieldName() {
        return "pk";
    }

    protected void startConnector(DebeziumEngine.CompletionCallback callback) {
        this.startConnector(Function.identity(), callback, true);
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        this.startConnector(custConfig, this.loggingCompletion(), true);
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback, boolean expectNoRecords) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config, callback);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(AbstractSnapshotTest.waitTimeForRecords(), TimeUnit.SECONDS);
        if (expectNoRecords) {
            this.assertNoRecordsToConsume();
        }
    }

    protected void startConnectorWithSnapshot(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        this.startConnector(custConfig, this.loggingCompletion(), false);
    }

    protected void startConnector() {
        this.startConnector(Function.identity(), this.loggingCompletion(), true);
    }

    protected void waitForConnectorToStart() {
        this.assertConnectorIsRunning();
    }

    protected Function<Struct, Integer> getRecordValue() {
        return s -> s.getStruct("after").getInt32(this.valueFieldName());
    }

    @Override
    protected int getMaximumEnqueuedRecordCount() {
        return 3000;
    }

    protected void sendExecuteSnapshotFileSignal(String fullTableNames) throws IOException {
        this.sendExecuteSnapshotFileSignal(fullTableNames, "INCREMENTAL", this.signalsFile);
    }

    protected void sendExecuteSnapshotFileSignal(String fullTableNames, String type, Path signalFile) throws IOException {
        String signalValue = String.format("{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"%s\"}}", fullTableNames, type);
        Files.write(signalFile, signalValue.getBytes(), new OpenOption[0]);
    }

    protected void sendAdHocSnapshotSignal(String ... dataCollectionIds) throws SQLException {
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", dataCollectionIds);
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey, String ... dataCollectionIds) {
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, dataCollectionIds);
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String surrogateKey, String ... dataCollectionIds) {
        this.sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(additionalConditions, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, dataCollectionIds);
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey, AbstractSnapshotSignal.SnapshotType snapshotType, String ... dataCollectionIds) {
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\"" + x + "\"").collect(Collectors.joining(", "));
        try (JdbcConnection connection = this.databaseConnection();){
            String query = !Strings.isNullOrEmpty((String)additionalCondition) && !Strings.isNullOrEmpty((String)surrogateKey) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition, surrogateKey) : (!Strings.isNullOrEmpty((String)additionalCondition) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition) : (!Strings.isNullOrEmpty((String)surrogateKey) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList)));
            this.logger.info("Sending signal with query {}", (Object)query);
            connection.execute(new String[]{query});
        }
        catch (Exception e) {
            this.logger.warn("Failed to send signal", (Throwable)e);
        }
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String surrogateKey, AbstractSnapshotSignal.SnapshotType snapshotType, String ... dataCollectionIds) {
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\"" + x + "\"").collect(Collectors.joining(", "));
        try (JdbcConnection connection = this.databaseConnection();){
            String query = !additionalConditions.isEmpty() && !Strings.isNullOrEmpty((String)surrogateKey) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s], \"surrogate-key\": %s}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, AbstractSnapshotTest.buildAdditionalConditions(additionalConditions), surrogateKey) : (!additionalConditions.isEmpty() ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s]}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, AbstractSnapshotTest.buildAdditionalConditions(additionalConditions)) : (!Strings.isNullOrEmpty((String)surrogateKey) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", this.signalTableName(), snapshotType.toString(), dataCollectionIdsList)));
            this.logger.info("Sending signal with query {}", (Object)query);
            connection.execute(new String[]{query});
        }
        catch (Exception e) {
            this.logger.warn("Failed to send signal", (Throwable)e);
        }
    }

    protected static String buildAdditionalConditions(Map<String, String> additionalConditions) {
        return additionalConditions.entrySet().stream().map(cond -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", cond.getKey(), cond.getValue())).collect(Collectors.joining(","));
    }
}

