/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.parquet;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.io.api.Binary;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class ParquetIOTest
implements Serializable {
    @Rule
    public transient TestPipeline mainPipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();
    @Rule
    public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final String SCHEMA_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":\"string\"},    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final org.apache.avro.Schema SCHEMA = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":\"string\"},    {\"name\":\"id\",\"type\":\"string\"}  ]}");
    private static final String REQUESTED_SCHEMA_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final String REQUESTED_SCHEMA_ENCODER_STRING = "{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":[\"string\",\"null\"]},    {\"name\":\"id\",\"type\":\"string\"}  ]}";
    private static final org.apache.avro.Schema REQUESTED_ENCODER_SCHEMA = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"name\",\"type\":[\"string\",\"null\"]},    {\"name\":\"id\",\"type\":\"string\"}  ]}");
    private static final org.apache.avro.Schema REQUESTED_SCHEMA = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"testrecord\",\"fields\":[    {\"name\":\"id\",\"type\":\"string\"}  ]}");
    private static final String[] SCIENTISTS = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};

    @Test
    public void testWriteAndReadWithProjection() {
        List<GenericRecord> requestRecords = this.generateRequestedRecords(1000L);
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withProjection(REQUESTED_SCHEMA, REQUESTED_ENCODER_SCHEMA));
        PAssert.that((PCollection)readBack).containsInAnyOrder(requestRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testBlockTracker() {
        OffsetRange range = new OffsetRange(0L, 1L);
        ParquetIO.ReadFiles.BlockTracker tracker = new ParquetIO.ReadFiles.BlockTracker(range, 7L, 3L);
        Assert.assertEquals((double)tracker.getProgress().getWorkRemaining(), (double)1.0, (double)0.01);
        Assert.assertEquals((double)tracker.getProgress().getWorkCompleted(), (double)0.0, (double)0.01);
        tracker.tryClaim(Long.valueOf(0L));
        tracker.tryClaim(Long.valueOf(1L));
        Assert.assertEquals((double)tracker.getProgress().getWorkRemaining(), (double)0.0, (double)0.01);
        Assert.assertEquals((double)tracker.getProgress().getWorkCompleted(), (double)1.0, (double)0.01);
    }

    @Test
    public void testSplitBlockWithLimit() {
        ParquetIO.ReadFiles.SplitReadFn testFn = new ParquetIO.ReadFiles.SplitReadFn(null, null, (SerializableFunction)ParquetIO.GenericRecordPassthroughFn.create(), null);
        ArrayList<BlockMetaData> blockList = new ArrayList<BlockMetaData>();
        BlockMetaData testBlock = (BlockMetaData)Mockito.mock(BlockMetaData.class);
        Mockito.when((Object)testBlock.getTotalByteSize()).thenReturn((Object)60L);
        ArrayList rangeList = testFn.splitBlockWithLimit(0L, (long)blockList.size(), blockList, 200L);
        Assert.assertTrue((boolean)rangeList.isEmpty());
        for (int i = 0; i < 6; ++i) {
            blockList.add(testBlock);
        }
        rangeList = testFn.splitBlockWithLimit(1L, (long)blockList.size(), blockList, 200L);
        Assert.assertEquals((long)1L, (long)((OffsetRange)rangeList.get(0)).getFrom());
        Assert.assertEquals((long)5L, (long)((OffsetRange)rangeList.get(0)).getTo());
        Assert.assertEquals((long)5L, (long)((OffsetRange)rangeList.get(1)).getFrom());
        Assert.assertEquals((long)6L, (long)((OffsetRange)rangeList.get(1)).getTo());
        Assert.assertEquals((long)2L, (long)rangeList.size());
    }

    @Test
    public void testWriteAndRead() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        ParquetIO.Read read = ParquetIO.read((org.apache.avro.Schema)SCHEMA);
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)read.from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBack).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteWithRowGroupSizeAndRead() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA).withRowGroupSize(1500)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBack).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadWithBeamSchema() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBackRecords = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withBeamSchemas(true));
        PAssert.that((PCollection)readBackRecords).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFilesAsJsonForUnknownSchema() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBackAsJson = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.parseGenericRecords((SerializableFunction)ParseGenericRecordAsJsonFn.create()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBackAsJson).containsInAnyOrder(ParquetIOTest.convertRecordsToJson(records));
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFiles() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ParquetIO.ReadFiles readFiles = ParquetIO.readFiles((org.apache.avro.Schema)SCHEMA);
        PCollection writeThenRead = (PCollection)((PCollection)((PCollection)((PCollection)((WriteFilesResult)((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()))).getPerDestinationOutputFilenames().apply((PTransform)Values.create())).apply((PTransform)FileIO.matchAll())).apply((PTransform)FileIO.readMatches())).apply((PTransform)readFiles);
        PAssert.that((PCollection)writeThenRead).containsInAnyOrder(records);
        this.mainPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadFilesAsJsonForUnknownSchemaFiles() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        List<String> expectedJsonRecords = ParquetIOTest.convertRecordsToJson(records);
        ParquetIO.ParseFiles parseFiles = ParquetIO.parseFilesGenericRecords((SerializableFunction)ParseGenericRecordAsJsonFn.create());
        PCollection writeThenRead = (PCollection)((PCollection)((PCollection)((PCollection)((WriteFilesResult)((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()))).getPerDestinationOutputFilenames().apply((PTransform)Values.create())).apply((PTransform)FileIO.matchAll())).apply((PTransform)FileIO.readMatches())).apply((PTransform)parseFiles);
        Assert.assertEquals((long)1000L, (long)expectedJsonRecords.size());
        PAssert.that((PCollection)writeThenRead).containsInAnyOrder(expectedJsonRecords);
        this.mainPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadFilesAsRowForUnknownSchemaFiles() {
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        List expectedRows = records.stream().map(record -> AvroUtils.toBeamRowStrict((GenericRecord)record, null)).collect(Collectors.toList());
        PCollection writeThenRead = (PCollection)((PCollection)((PCollection)((PCollection)((WriteFilesResult)((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()))).getPerDestinationOutputFilenames().apply((PTransform)Values.create())).apply((PTransform)FileIO.matchAll())).apply((PTransform)FileIO.readMatches())).apply((PTransform)ParquetIO.parseFilesGenericRecords((SerializableFunction & Serializable)record -> AvroUtils.toBeamRowStrict((GenericRecord)record, null)).withCoder((Coder)SchemaCoder.of((Schema)AvroUtils.toBeamSchema((org.apache.avro.Schema)SCHEMA))));
        PAssert.that((PCollection)writeThenRead).containsInAnyOrder(expectedRows);
        this.mainPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadFilesUnknownSchemaFilesForGenericRecordThrowException() {
        IllegalArgumentException illegalArgumentException = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> ParquetIO.parseFilesGenericRecords((SerializableFunction)ParquetIO.GenericRecordPassthroughFn.create()).expand(null));
        Assert.assertEquals((Object)"Parse can't be used for reading as GenericRecord.", (Object)illegalArgumentException.getMessage());
    }

    private List<GenericRecord> generateGenericRecords(long count) {
        ArrayList<GenericRecord> data = new ArrayList<GenericRecord>();
        GenericRecordBuilder builder = new GenericRecordBuilder(SCHEMA);
        int i = 0;
        while ((long)i < count) {
            int index = i % SCIENTISTS.length;
            GenericData.Record record = builder.set("name", (Object)SCIENTISTS[index]).set("id", (Object)Integer.toString(i)).build();
            data.add((GenericRecord)record);
            ++i;
        }
        return data;
    }

    private List<GenericRecord> generateRequestedRecords(long count) {
        ArrayList<GenericRecord> data = new ArrayList<GenericRecord>();
        GenericRecordBuilder builder = new GenericRecordBuilder(REQUESTED_ENCODER_SCHEMA);
        int i = 0;
        while ((long)i < count) {
            GenericData.Record record = builder.set("id", (Object)Integer.toString(i)).set("name", null).build();
            data.add((GenericRecord)record);
            ++i;
        }
        return data;
    }

    @Test
    public void testReadDisplayData() {
        Configuration configuration = new Configuration();
        configuration.set("parquet.foo", "foo");
        DisplayData displayData = DisplayData.from((HasDisplayData)ParquetIO.read((org.apache.avro.Schema)SCHEMA).from("foo.parquet").withProjection(REQUESTED_SCHEMA, SCHEMA).withAvroDataModel(GenericData.get()).withConfiguration(configuration));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"filePattern", (String)"foo.parquet"));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"schema", (String)SCHEMA.toString()));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"inferBeamSchema", (Boolean)false));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"projectionSchema", (String)REQUESTED_SCHEMA.toString()));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"avroDataModel", (String)GenericData.get().toString()));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"parquet.foo", (String)"foo"));
    }

    @Test(expected=Pipeline.PipelineExecutionException.class)
    public void testWriteAndReadUsingReflectDataSchemaWithoutDataModelThrowsException() {
        org.apache.avro.Schema testRecordSchema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)testRecordSchema)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)testRecordSchema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)testRecordSchema).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBack).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadUsingReflectDataSchemaWithDataModel() {
        org.apache.avro.Schema testRecordSchema = ReflectData.get().getSchema(TestRecord.class);
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)testRecordSchema)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)testRecordSchema)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)testRecordSchema).withAvroDataModel(GenericData.get()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBack).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadUsingGenericDataSchemaWithDataModel() {
        org.apache.avro.Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
        List<GenericRecord> records = this.generateGenericRecords(1000L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)schema)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)schema).withAvroDataModel(GenericData.get())).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)schema).withAvroDataModel(GenericData.get()).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBack).containsInAnyOrder(records);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadWithConfiguration() {
        List<GenericRecord> records = this.generateGenericRecords(10L);
        List<GenericRecord> expectedRecords = this.generateGenericRecords(1L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        Configuration configuration = new Configuration();
        Operators.Eq filterPredicate = FilterApi.eq((Operators.Column)FilterApi.binaryColumn((String)"id"), (Comparable)Binary.fromString((String)"0"));
        ParquetInputFormat.setFilterPredicate((Configuration)configuration, (FilterPredicate)filterPredicate);
        PCollection readBack = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.read((org.apache.avro.Schema)SCHEMA).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*").withConfiguration(configuration));
        PAssert.that((PCollection)readBack).containsInAnyOrder(expectedRecords);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
        List<GenericRecord> records = this.generateGenericRecords(10L);
        List<GenericRecord> expectedRecords = this.generateGenericRecords(1L);
        ((PCollection)this.mainPipeline.apply((PTransform)Create.of(records).withCoder((Coder)AvroCoder.of((org.apache.avro.Schema)SCHEMA)))).apply((PTransform)FileIO.write().via((FileIO.Sink)ParquetIO.sink((org.apache.avro.Schema)SCHEMA)).to(this.temporaryFolder.getRoot().getAbsolutePath()));
        this.mainPipeline.run().waitUntilFinish();
        Configuration configuration = new Configuration();
        Operators.Eq filterPredicate = FilterApi.eq((Operators.Column)FilterApi.binaryColumn((String)"id"), (Comparable)Binary.fromString((String)"0"));
        ParquetInputFormat.setFilterPredicate((Configuration)configuration, (FilterPredicate)filterPredicate);
        PCollection readBackAsJson = (PCollection)this.readPipeline.apply((PTransform)ParquetIO.parseGenericRecords((SerializableFunction)ParseGenericRecordAsJsonFn.create()).withConfiguration(configuration).from(this.temporaryFolder.getRoot().getAbsolutePath() + "/*"));
        PAssert.that((PCollection)readBackAsJson).containsInAnyOrder(ParquetIOTest.convertRecordsToJson(expectedRecords));
        this.readPipeline.run().waitUntilFinish();
    }

    private static List<String> convertRecordsToJson(List<GenericRecord> records) {
        return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(Collectors.toList());
    }

    private static class ParseGenericRecordAsJsonFn
    implements SerializableFunction<GenericRecord, String> {
        private ParseGenericRecordAsJsonFn() {
        }

        public static ParseGenericRecordAsJsonFn create() {
            return new ParseGenericRecordAsJsonFn();
        }

        public String apply(GenericRecord input) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(input.getSchema(), (OutputStream)baos, true);
                new GenericDatumWriter(input.getSchema()).write((Object)input, (Encoder)jsonEncoder);
                jsonEncoder.flush();
            }
            catch (IOException ioException) {
                throw new RuntimeException("error converting record to JSON", ioException);
            }
            try {
                return baos.toString("UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class TestRecord {
        String name;

        public TestRecord(String name) {
            this.name = name;
        }
    }
}

