/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.debezium;

import java.io.IOException;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

public abstract class TestAbstractDebeziumSource
extends UtilitiesTestBase {
    private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
    private final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);
    private KafkaTestUtils testUtils;

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices();
    }

    @BeforeEach
    public void setUpKafkaTestUtils() {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void tearDownKafkaTestUtils() {
        this.testUtils.teardown();
        this.testUtils = null;
    }

    @AfterAll
    public static void cleanupClass() throws IOException {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
    }

    private TypedProperties createPropsForJsonSource() {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.streamer.source.kafka.topic", this.testTopicName);
        props.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.streamer.schemaprovider.registry.url", "localhost");
        props.setProperty("hoodie.streamer.source.kafka.value.deserializer.class", StringDeserializer.class.getName());
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    protected abstract String getIndexName();

    protected abstract String getSourceClass();

    protected abstract String getSchema();

    protected abstract GenericRecord generateMetaFields(GenericRecord var1);

    protected abstract void validateMetaFields(Dataset<Row> var1);

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testDebeziumEvents(Operation operation) throws Exception {
        String sourceClass = this.getSourceClass();
        this.testUtils.createTopic(this.testTopicName, 2);
        TypedProperties props = this.createPropsForJsonSource();
        MockSchemaRegistryProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, this);
        SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource((String)sourceClass, (TypedProperties)props, (JavaSparkContext)jsc, (SparkSession)sparkSession, (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)schemaProvider, Option.empty())));
        this.testUtils.sendMessages(this.testTopicName, new String[]{this.generateDebeziumEvent(operation).toString()});
        InputBatch fetch = debeziumSource.fetchNewDataInRowFormat(Option.empty(), 10L);
        Assertions.assertEquals((long)1L, (long)((Dataset)fetch.getBatch().get()).count());
        String fieldPrefix = operation.equals((Object)Operation.DELETE) ? "before_" : "after_";
        Assertions.assertTrue((boolean)((Dataset)fetch.getBatch().get()).select("type", new String[0]).collectAsList().stream().allMatch(r -> r.getString(0).startsWith(fieldPrefix)));
        Assertions.assertTrue((boolean)((Dataset)fetch.getBatch().get()).select("type", new String[0]).collectAsList().stream().allMatch(r -> r.getString(0).startsWith(fieldPrefix)));
        this.validateMetaFields((Dataset<Row>)((Dataset)fetch.getBatch().get()));
    }

    @Test
    public void testDatasetRowSchemaWithoutData() throws Exception {
        String sourceClass = this.getSourceClass();
        this.testUtils.createTopic(this.testTopicName, 2);
        TypedProperties props = this.createPropsForJsonSource();
        MockSchemaRegistryProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, this);
        SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource((String)sourceClass, (TypedProperties)props, (JavaSparkContext)jsc, (SparkSession)sparkSession, (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)schemaProvider, Option.empty())));
        InputBatch fetch = debeziumSource.fetchNewDataInRowFormat(Option.empty(), 10L);
        Dataset result = (Dataset)fetch.getBatch().get();
        Assertions.assertEquals((long)result.count(), (long)0L);
        Assertions.assertTrue((result.columns().length > 0 ? 1 : 0) != 0);
    }

    private GenericRecord generateDebeziumEvent(Operation op) {
        Schema schema = new Schema.Parser().parse(this.getSchema());
        String indexName = this.getIndexName().concat(".ghschema.gharchive.Value");
        GenericData.Record rec = new GenericData.Record(schema);
        rec.put("op", (Object)op.op);
        rec.put("ts_ms", (Object)100L);
        Schema.Field beforeField = schema.getField("before");
        Schema beforeSchema = (Schema)beforeField.schema().getTypes().get(beforeField.schema().getIndexNamed(indexName));
        GenericData.Record beforeRecord = new GenericData.Record(beforeSchema);
        beforeRecord.put("id", (Object)1);
        beforeRecord.put("date", (Object)"1/1/2020");
        beforeRecord.put("type", (Object)"before_type");
        beforeRecord.put("payload", (Object)"before_payload");
        beforeRecord.put("timestamp", (Object)1000L);
        rec.put("before", (Object)beforeRecord);
        Schema.Field afterField = schema.getField("after");
        Schema afterSchema = (Schema)afterField.schema().getTypes().get(afterField.schema().getIndexNamed(indexName));
        GenericData.Record afterRecord = new GenericData.Record(afterSchema);
        afterRecord.put("id", (Object)1);
        afterRecord.put("date", (Object)"1/1/2021");
        afterRecord.put("type", (Object)"after_type");
        afterRecord.put("payload", (Object)"after_payload");
        afterRecord.put("timestamp", (Object)3000L);
        rec.put("after", (Object)afterRecord);
        return this.generateMetaFields((GenericRecord)rec);
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{Operation.INSERT}), Arguments.arguments((Object[])new Object[]{Operation.UPDATE}), Arguments.arguments((Object[])new Object[]{Operation.DELETE}));
    }

    private static enum Operation {
        INSERT("c"),
        UPDATE("u"),
        DELETE("d");

        public final String op;

        private Operation(String op) {
            this.op = op;
        }
    }

    private static class MockSchemaRegistryProvider
    extends SchemaRegistryProvider {
        private final String schema;

        public MockSchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc, TestAbstractDebeziumSource source) {
            super(props, jssc);
            this.schema = source.getSchema();
        }

        public String fetchSchemaFromRegistry(String registryUrl) {
            return this.schema;
        }
    }
}

