/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms;

import io.debezium.doc.FixFor;
import io.debezium.transforms.ByLogicalTableRouter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class ByLogicalTableRouterTest {
    @Test(expected=ConnectException.class)
    public void testBrokenKeyReplacementConfigurationNullValue() {
        ByLogicalTableRouter subject = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "someValidRegex(.*)");
        props.put("topic.replacement", "$1");
        props.put("key.field.regex", "If this is set, key.field.replacement must be non-empty");
        subject.configure(props);
    }

    @Test(expected=ConnectException.class)
    public void testBrokenKeyReplacementConfigurationEmptyValue() {
        ByLogicalTableRouter subject = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "someValidRegex(.*)");
        props.put("topic.replacement", "$1");
        props.put("key.field.regex", "If this is set, key.field.replacement must be non-empty");
        props.put("key.field.replacement", "");
        subject.configure(props);
    }

    @Test
    public void testKeyReplacementWorkingConfigurationWithAvroSchemaNameAdjustment() {
        this.testKeyReplacementWorkingConfiguration("avro", "mysql_server_1.inventory.customers_all_shards.Key");
    }

    @Test
    @FixFor(value={"DBZ-3535"})
    public void testKeyReplacementWorkingConfigurationWithNoSchemaNameAdjustment() {
        this.testKeyReplacementWorkingConfiguration("none", "mysql-server-1.inventory.customers_all_shards.Key");
    }

    private void testKeyReplacementWorkingConfiguration(String schemaNameAdjustmentMode, String expectedKeySchemaName) {
        ByLogicalTableRouter router = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "(.*)customers_shard(.*)");
        props.put("topic.replacement", "$1customers_all_shards");
        props.put("key.field.name", "shard_id");
        props.put("key.field.regex", "(.*)customers_shard_(.*)");
        props.put("key.field.replacement", "$2");
        props.put("schema.name.adjustment.mode", schemaNameAdjustmentMode);
        router.configure(props);
        Schema keySchema = SchemaBuilder.struct().name("mysql-server-1.inventory.customers_shard_1.Key").field("id", SchemaBuilder.int64().build()).build();
        Struct key1 = new Struct(keySchema).put("id", (Object)123L);
        SourceRecord record1 = new SourceRecord(new HashMap(), new HashMap(), "mysql-server-1.inventory.customers_shard_1", keySchema, (Object)key1, null, null);
        SourceRecord transformed1 = (SourceRecord)router.apply((ConnectRecord)record1);
        Assertions.assertThat((Object)transformed1).isNotNull();
        Assertions.assertThat((String)transformed1.topic()).isEqualTo((Object)"mysql-server-1.inventory.customers_all_shards");
        Assertions.assertThat((String)transformed1.keySchema().name()).isEqualTo((Object)expectedKeySchemaName);
        Assertions.assertThat((List)transformed1.keySchema().fields()).hasSize(2);
        Assertions.assertThat((String)((Field)transformed1.keySchema().fields().get(0)).name()).isEqualTo((Object)"id");
        Assertions.assertThat((String)((Field)transformed1.keySchema().fields().get(1)).name()).isEqualTo((Object)"shard_id");
        Assertions.assertThat((Object)((Struct)transformed1.key()).get("id")).isEqualTo((Object)123L);
        Assertions.assertThat((Object)((Struct)transformed1.key()).get("shard_id")).isEqualTo((Object)"1");
        Struct key2 = new Struct(keySchema).put("id", (Object)123L);
        SourceRecord record2 = new SourceRecord(new HashMap(), new HashMap(), "mysql-server-1.inventory.customers_shard_2", keySchema, (Object)key2, null, null);
        SourceRecord transformed2 = (SourceRecord)router.apply((ConnectRecord)record2);
        Assertions.assertThat((Object)transformed2).isNotNull();
        Assertions.assertThat((String)transformed2.topic()).isEqualTo((Object)"mysql-server-1.inventory.customers_all_shards");
        Assertions.assertThat((String)transformed2.keySchema().name()).isEqualTo((Object)expectedKeySchemaName);
        Assertions.assertThat((List)transformed2.keySchema().fields()).hasSize(2);
        Assertions.assertThat((String)((Field)transformed2.keySchema().fields().get(0)).name()).isEqualTo((Object)"id");
        Assertions.assertThat((String)((Field)transformed2.keySchema().fields().get(1)).name()).isEqualTo((Object)"shard_id");
        Assertions.assertThat((Object)((Struct)transformed2.key()).get("id")).isEqualTo((Object)123L);
        Assertions.assertThat((Object)((Struct)transformed2.key()).get("shard_id")).isEqualTo((Object)"2");
        Struct key3 = new Struct(keySchema).put("id", (Object)456L);
        SourceRecord record3 = new SourceRecord(new HashMap(), new HashMap(), "mysql-server-1.inventory.customers_shard_2", keySchema, (Object)key3, null, null);
        SourceRecord transformed3 = (SourceRecord)router.apply((ConnectRecord)record3);
        Assertions.assertThat((Object)transformed3).isNotNull();
        Assertions.assertThat((String)transformed3.topic()).isEqualTo((Object)"mysql-server-1.inventory.customers_all_shards");
        Assertions.assertThat((String)transformed3.keySchema().name()).isEqualTo((Object)expectedKeySchemaName);
        Assertions.assertThat((List)transformed3.keySchema().fields()).hasSize(2);
        Assertions.assertThat((String)((Field)transformed3.keySchema().fields().get(0)).name()).isEqualTo((Object)"id");
        Assertions.assertThat((String)((Field)transformed3.keySchema().fields().get(1)).name()).isEqualTo((Object)"shard_id");
        Assertions.assertThat((Object)((Struct)transformed3.key()).get("id")).isEqualTo((Object)456L);
        Assertions.assertThat((Object)((Struct)transformed3.key()).get("shard_id")).isEqualTo((Object)"2");
    }

    @Test
    public void testHeartbeatMessageTopicReplacementTopic() {
        ByLogicalTableRouter router = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        String keyFieldName = "serverName";
        String keyOriginalKeyTopic = "originalTopic";
        String replacedTopic = "debezium_heartbeat_central";
        props.put("topic.regex", "__debezium-heartbeat(.*)");
        props.put("topic.replacement", "debezium_heartbeat_central");
        props.put("key.field.name", "originalTopic");
        router.configure(props);
        Schema keySchema = SchemaBuilder.struct().name("io.debezium.connector.mysql.ServerNameKey").field("serverName", Schema.STRING_SCHEMA).build();
        Struct key1 = new Struct(keySchema).put("serverName", (Object)"test_server_name_db");
        SourceRecord record1 = new SourceRecord(new HashMap(), new HashMap(), "__debezium-heartbeat.test_server_name_db", keySchema, (Object)key1, null, null);
        SourceRecord transformed1 = (SourceRecord)router.apply((ConnectRecord)record1);
        Assertions.assertThat((Object)transformed1).isNotNull();
        Assertions.assertThat((String)transformed1.topic()).isEqualTo((Object)"debezium_heartbeat_central");
        Assertions.assertThat((String)transformed1.keySchema().name()).isEqualTo((Object)"debezium_heartbeat_central.Key");
        Assertions.assertThat((List)transformed1.keySchema().fields()).hasSize(2);
        Assertions.assertThat((String)((Field)transformed1.keySchema().fields().get(0)).name()).isEqualTo((Object)"serverName");
        Assertions.assertThat((String)((Field)transformed1.keySchema().fields().get(1)).name()).isEqualTo((Object)"originalTopic");
        Assertions.assertThat((Object)((Struct)transformed1.key()).get("serverName")).isEqualTo((Object)"test_server_name_db");
        Assertions.assertThat((Object)((Struct)transformed1.key()).get("originalTopic")).isEqualTo((Object)"__debezium-heartbeat.test_server_name_db");
        Assertions.assertThat((Object)transformed1.value()).isNull();
        Struct key2 = new Struct(keySchema).put("serverName", (Object)"test_server_name_db_2");
        SourceRecord record2 = new SourceRecord(new HashMap(), new HashMap(), "__debezium-heartbeat.test_server_name_db_2", keySchema, (Object)key2, null, null);
        SourceRecord transformed2 = (SourceRecord)router.apply((ConnectRecord)record2);
        Assertions.assertThat((Object)transformed2).isNotNull();
        Assertions.assertThat((String)transformed2.topic()).isEqualTo((Object)"debezium_heartbeat_central");
        Assertions.assertThat((String)transformed2.keySchema().name()).isEqualTo((Object)"debezium_heartbeat_central.Key");
        Assertions.assertThat((List)transformed2.keySchema().fields()).hasSize(2);
        Assertions.assertThat((String)((Field)transformed2.keySchema().fields().get(0)).name()).isEqualTo((Object)"serverName");
        Assertions.assertThat((String)((Field)transformed2.keySchema().fields().get(1)).name()).isEqualTo((Object)"originalTopic");
        Assertions.assertThat((Object)((Struct)transformed2.key()).get("serverName")).isEqualTo((Object)"test_server_name_db_2");
        Assertions.assertThat((Object)((Struct)transformed2.key()).get("originalTopic")).isEqualTo((Object)"__debezium-heartbeat.test_server_name_db_2");
        Assertions.assertThat((Object)transformed2.value()).isNull();
    }

    @Test(expected=ConnectException.class)
    public void testBrokenTopicReplacementConfigurationNullValue() {
        ByLogicalTableRouter subject = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "someValidRegex(.*)");
        subject.configure(props);
    }

    @Test(expected=ConnectException.class)
    public void testBrokenTopicReplacementConfigurationEmptyValue() {
        ByLogicalTableRouter subject = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "someValidRegex(.*)");
        props.put("topic.replacement", "");
        subject.configure(props);
    }

    @Test
    @FixFor(value={"DBZ-1086"})
    public void testKeyNullValue() {
        ByLogicalTableRouter router = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "(.*)customers_shard(.*)");
        props.put("topic.replacement", "$1customers_all_shards");
        props.put("key.field.name", "shard_id");
        props.put("key.field.regex", "(.*)customers_shard_(.*)");
        props.put("key.field.replacement", "$2");
        router.configure(props);
        SourceRecord record1 = new SourceRecord(new HashMap(), new HashMap(), "mysql-server-1.inventory.customers_shard_1", null, null, null, null);
        SourceRecord transformed1 = (SourceRecord)router.apply((ConnectRecord)record1);
        Assertions.assertThat((Object)transformed1).isNotNull();
        Assertions.assertThat((String)transformed1.topic()).isEqualTo((Object)"mysql-server-1.inventory.customers_all_shards");
        Assertions.assertThat((Object)transformed1.keySchema()).isNull();
        Assertions.assertThat((Object)transformed1.key()).isNull();
    }

    @Test
    @FixFor(value={"DBZ-2034"})
    public void testNamespaceReplacementWithoutKeyChange() {
        ByLogicalTableRouter router = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "(.*).dbz_shard_\\d+.(.*)");
        props.put("topic.replacement", "dbz_core.$2");
        props.put("key.enforce.uniqueness", "false");
        router.configure(props);
        Schema keySchema = SchemaBuilder.struct().name("s21.dbz_shard_21.address.Key").field("id", SchemaBuilder.int64().build()).build();
        Struct key1 = new Struct(keySchema).put("id", (Object)123L);
        SourceRecord record1 = new SourceRecord(new HashMap(), new HashMap(), "s21.dbz_shard_21.address", keySchema, (Object)key1, null, null);
        SourceRecord transformed1 = (SourceRecord)router.apply((ConnectRecord)record1);
        Assertions.assertThat((Object)transformed1).isNotNull();
        Assertions.assertThat((String)transformed1.topic()).isEqualTo((Object)"dbz_core.address");
        Assertions.assertThat((String)transformed1.keySchema().name()).isEqualTo((Object)"dbz_core.address.Key");
        Assertions.assertThat((List)transformed1.keySchema().fields()).hasSize(1);
        Assertions.assertThat((String)((Field)transformed1.keySchema().fields().get(0)).name()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)((Struct)transformed1.key()).get("id")).isEqualTo((Object)123L);
        Struct key2 = new Struct(keySchema).put("id", (Object)123L);
        keySchema = SchemaBuilder.struct().name("s22.dbz_shard_22.address.Key").field("id", SchemaBuilder.int64().build()).build();
        SourceRecord record2 = new SourceRecord(new HashMap(), new HashMap(), "s22.dbz_shard_22.address", keySchema, (Object)key2, null, null);
        SourceRecord transformed2 = (SourceRecord)router.apply((ConnectRecord)record2);
        Assertions.assertThat((Object)transformed2).isNotNull();
        Assertions.assertThat((String)transformed2.topic()).isEqualTo((Object)"dbz_core.address");
        Assertions.assertThat((String)transformed2.keySchema().name()).isEqualTo((Object)"dbz_core.address.Key");
        Assertions.assertThat((List)transformed2.keySchema().fields()).hasSize(1);
        Assertions.assertThat((String)((Field)transformed2.keySchema().fields().get(0)).name()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)((Struct)transformed2.key()).get("id")).isEqualTo((Object)123L);
    }

    @Test
    @FixFor(value={"DBZ-2412"})
    public void shouldHandleSchemaChangeEvent() throws Exception {
        ByLogicalTableRouter router = new ByLogicalTableRouter();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("topic.regex", "(.*)");
        props.put("topic.replacement", "$1_rerouted");
        router.configure(props);
        Schema schemaChangeKeySchema = SchemaBuilder.struct().name("io.debezium.connector.mysql.SchemaChangeKey").field("databaseName", Schema.STRING_SCHEMA).build();
        SchemaBuilder schemaChangeValueSchema = SchemaBuilder.struct().name("io.debezium.connector.mysql.SchemaChangeValue").field("databaseName", Schema.OPTIONAL_STRING_SCHEMA);
        SourceRecord record = new SourceRecord(null, Collections.singletonMap("offset", "1"), "schema_changes", Integer.valueOf(0), schemaChangeKeySchema, (Object)new Struct(schemaChangeKeySchema).put("databaseName", (Object)"my-db"), (Schema)schemaChangeValueSchema, (Object)new Struct((Schema)schemaChangeValueSchema).put("databaseName", (Object)"my-db"));
        SourceRecord transformed = (SourceRecord)router.apply((ConnectRecord)record);
        Assertions.assertThat((String)transformed.topic()).isEqualTo((Object)"schema_changes_rerouted");
        Assertions.assertThat((Object)transformed.value()).isSameAs(record.value());
    }
}

