/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.converters;

import io.debezium.config.Configuration;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.transforms.outbox.EventRouter;
import java.util.LinkedHashMap;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractCloudEventsConverterTest<T extends SourceConnector>
extends AbstractConnectorTest {
    protected EventRouter<SourceRecord> outboxEventRouter;
    protected HeaderFrom<SourceRecord> headerFrom;

    protected abstract Class<T> getConnectorClass();

    protected abstract String getConnectorName();

    protected abstract String getServerName();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract void createTable() throws Exception;

    protected abstract String createInsert(String var1, String var2, String var3, String var4, String var5, String var6);

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        this.createTable();
        this.headerFrom = new HeaderFrom.Value();
        LinkedHashMap<String, String> headerFromConfig = new LinkedHashMap<String, String>();
        headerFromConfig.put("fields", "source,op,transaction");
        headerFromConfig.put("headers", "source,op,transaction");
        headerFromConfig.put("operation", "copy");
        headerFromConfig.put("header.converter.schemas.enable", "true");
        this.headerFrom.configure(headerFromConfig);
        this.outboxEventRouter = new EventRouter();
        LinkedHashMap<String, String> outboxEventRouterConfig = new LinkedHashMap<String, String>();
        outboxEventRouterConfig.put("table.expand.json.payload", "true");
        this.outboxEventRouter.configure(outboxEventRouterConfig);
        this.startConnector();
    }

    @After
    public void afterEach() throws Exception {
        this.stopConnector();
        this.assertNoRecordsToConsume();
        this.databaseConnection().close();
        this.headerFrom.close();
        this.outboxEventRouter.close();
    }

    @Test
    @FixFor(value={"DBZ-3642"})
    public void shouldConvertToCloudEventsInJsonWithMetadataInHeadersAfterOutboxEventRouter() throws Exception {
        this.databaseConnection().execute(new String[]{this.createInsert("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{\"someField1\": \"some value 1\",\"someField2\": 7005}", "")});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord recordWithMetadataHeaders = (SourceRecord)this.headerFrom.apply((ConnectRecord)record);
        SourceRecord routedEvent = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)recordWithMetadataHeaders);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.User");
        Assertions.assertThat((Object)routedEvent.keySchema()).isEqualTo((Object)Schema.STRING_SCHEMA);
        Assertions.assertThat((Object)routedEvent.key()).isEqualTo((Object)"10711fa5");
        Assertions.assertThat((Object)routedEvent.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataInHeaders((SourceRecord)routedEvent, (String)this.getConnectorName(), (String)this.getServerName());
    }

    private void startConnector() throws Exception {
        Configuration.Builder configBuilder = this.getConfigurationBuilder();
        this.start(this.getConnectorClass(), configBuilder.build());
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        this.assertNoRecordsToConsume();
    }
}

