/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.HoodieStreamerUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

public class TestHoodieStreamerUtils
extends UtilitiesTestBase {
    private static final String SCHEMA_STRING = "{\"type\": \"record\",\"name\": \"rec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}]}";

    @BeforeAll
    public static void setupOnce() throws Exception {
        TestHoodieStreamerUtils.initTestServices();
    }

    private static Stream<Arguments> validRecordTypes() {
        Stream.Builder<Arguments> b = Stream.builder();
        b.add(Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.SPARK}));
        b.add(Arguments.of((Object[])new Object[]{HoodieRecord.HoodieRecordType.AVRO}));
        return b.build();
    }

    @ParameterizedTest
    @MethodSource(value={"validRecordTypes"})
    public void testCreateHoodieRecordsWithError(HoodieRecord.HoodieRecordType recordType) {
        Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
        JavaRDD recordRdd = jsc.parallelize(Collections.singletonList(1)).map((Function & Serializable)i -> {
            GenericData.Record record = new GenericData.Record(schema);
            record.put(0, (Object)((long)i.intValue() * 1000L));
            record.put(1, (Object)("key" + i));
            record.put(2, (Object)("path" + i));
            record.put(3, null);
            record.put(4, (Object)"driver");
            return record;
        });
        HoodieStreamer.Config cfg = new HoodieStreamer.Config();
        cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
        TypedProperties props = new TypedProperties();
        SimpleSchemaProvider schemaProvider = new SimpleSchemaProvider(jsc, schema, props);
        BaseErrorTableWriter errorTableWriter = (BaseErrorTableWriter)Mockito.mock(BaseErrorTableWriter.class);
        ArgumentCaptor errorEventCaptor = ArgumentCaptor.forClass(JavaRDD.class);
        ((BaseErrorTableWriter)Mockito.doNothing().when((Object)errorTableWriter)).addErrorEvents((JavaRDD)errorEventCaptor.capture());
        HoodieStreamerUtils.createHoodieRecords((HoodieStreamer.Config)cfg, (TypedProperties)props, (Option)Option.of((Object)recordRdd), (SchemaProvider)schemaProvider, (HoodieRecord.HoodieRecordType)recordType, (boolean)false, (String)"000", (Option)Option.of((Object)errorTableWriter));
        List actualErrorEvents = ((JavaRDD)errorEventCaptor.getValue()).collect();
        ErrorEvent expectedErrorEvent = new ErrorEvent((Object)"{\"timestamp\": 1000, \"_row_key\": \"key1\", \"partition_path\": \"path1\", \"rider\": null, \"driver\": \"driver\"}", ErrorEvent.ErrorReason.RECORD_CREATION);
        Assertions.assertEquals(Collections.singletonList(expectedErrorEvent), (Object)actualErrorEvents);
    }
}

