/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log.block;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.ByteArraySeekableDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

public class TestHoodieAvroDataBlock {
    private static final String RECORD_KEY_FIELD = HoodieRecord.RECORD_KEY_METADATA_FIELD;
    private static final String SCHEMA_STRING = "{\n  \"type\": \"record\",\n  \"name\": \"RandomRecord\",\n  \"fields\": [\n    {\"name\": \"" + RECORD_KEY_FIELD + "\", \"type\": \"string\"},\n    {\"name\": \"value\", \"type\": \"double\"}\n  ]\n}";
    private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);
    private static final HoodieLogBlock.HoodieLogBlockContentLocation NULL_BLOCK_CONTENT_LOCATION = new HoodieLogBlock.HoodieLogBlockContentLocation(null, null, 0L, 0L, 0L);

    @ParameterizedTest
    @CsvSource(value={"true, true", "true, false", "false, true", "false, false"})
    public void testGetRecordIteratorWithRecordsPresent(boolean useKeyFilter, boolean keyFilterFullKeyMatch) throws IOException {
        List<HoodieRecord> records = TestHoodieAvroDataBlock.generateRandomHoodieRecords(SCHEMA, 1000);
        HoodieAvroDataBlock block = TestHoodieAvroDataBlock.createHoodieAvroDataBlock(SCHEMA, records);
        List<HoodieRecord> recordsForFilter = TestHoodieAvroDataBlock.selectRandomRecords(records, keyFilterFullKeyMatch);
        List keysForFilter = recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
        try (ClosableIterator recordIterator = useKeyFilter ? block.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, HoodieRecord.HoodieRecordType.AVRO) : block.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);){
            ArrayList<HoodieRecord> retrievedRecords = new ArrayList<HoodieRecord>();
            recordIterator.forEachRemaining(retrievedRecords::add);
            TestHoodieAvroDataBlock.verifyRecords(useKeyFilter ? recordsForFilter : records, retrievedRecords);
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, true", "true, false", "false, true", "false, false"})
    public void testGetRecordIteratorWithContent(boolean useKeyFilter, boolean keyFilterFullKeyMatch) throws IOException {
        List<HoodieRecord> records = TestHoodieAvroDataBlock.generateRandomHoodieRecords(SCHEMA, 1000);
        byte[] content = TestHoodieAvroDataBlock.createHoodieAvroDataBlockContent(SCHEMA, records);
        Option contentOpt = Option.of((Object)content);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
        List<HoodieRecord> recordsForFilter = TestHoodieAvroDataBlock.selectRandomRecords(records, keyFilterFullKeyMatch);
        List keysForFilter = recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
        HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> null, contentOpt, false, NULL_BLOCK_CONTENT_LOCATION, Option.of((Object)SCHEMA), header, new HashMap(), RECORD_KEY_FIELD);
        try (ClosableIterator recordIterator = useKeyFilter ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, HoodieRecord.HoodieRecordType.AVRO) : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);){
            ArrayList<HoodieRecord> retrievedRecords = new ArrayList<HoodieRecord>();
            recordIterator.forEachRemaining(retrievedRecords::add);
            TestHoodieAvroDataBlock.verifyRecords(useKeyFilter ? recordsForFilter : records, retrievedRecords);
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, true, true", "true, true, false", "true, false, true", "true, false, false", "false, true, true", "false, true, false", "false, false, true", "false, false, false"})
    public void testGetRecordIteratorWithInputStream(boolean useStreamingRead, boolean useKeyFilter, boolean keyFilterFullKeyMatch) throws IOException {
        List<HoodieRecord> records = TestHoodieAvroDataBlock.generateRandomHoodieRecords(SCHEMA, 1000);
        byte[] blockContent = TestHoodieAvroDataBlock.createHoodieAvroDataBlockContent(SCHEMA, records);
        SeekableDataInputStream inputStream = TestHoodieAvroDataBlock.createSeekableDataInputStream(blockContent);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
        HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLocation = new HoodieLogBlock.HoodieLogBlockContentLocation(null, null, 0L, (long)blockContent.length, (long)blockContent.length);
        int bufferSize = useStreamingRead ? 100 : 0;
        List<HoodieRecord> recordsForFilter = TestHoodieAvroDataBlock.selectRandomRecords(records, keyFilterFullKeyMatch);
        List keysForFilter = recordsForFilter.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList());
        HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream, Option.empty(), true, logBlockContentLocation, Option.of((Object)SCHEMA), header, new HashMap(), RECORD_KEY_FIELD);
        try (ClosableIterator recordIterator = useKeyFilter ? dataBlock.getRecordIterator(keysForFilter, keyFilterFullKeyMatch, HoodieRecord.HoodieRecordType.AVRO, bufferSize) : dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, bufferSize);){
            ArrayList<HoodieRecord> retrievedRecords = new ArrayList<HoodieRecord>();
            recordIterator.forEachRemaining(retrievedRecords::add);
            TestHoodieAvroDataBlock.verifyRecords(useKeyFilter ? recordsForFilter : records, retrievedRecords);
        }
    }

    @ParameterizedTest
    @ValueSource(ints={-2147483648, -2, -1, 0, 1, 2, 100, 200, 300, 1024, 0x7FFFFFFF})
    public void testGetRecordIteratorWithStreamingRead(int bufferSize) throws IOException {
        List<HoodieRecord> records = TestHoodieAvroDataBlock.generateRandomHoodieRecords(SCHEMA, 1000);
        byte[] blockContent = TestHoodieAvroDataBlock.createHoodieAvroDataBlockContent(SCHEMA, records);
        SeekableDataInputStream inputStream = TestHoodieAvroDataBlock.createSeekableDataInputStream(blockContent);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
        HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLocation = new HoodieLogBlock.HoodieLogBlockContentLocation(null, null, 0L, (long)blockContent.length, (long)blockContent.length);
        HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(() -> inputStream, Option.empty(), true, logBlockContentLocation, Option.of((Object)SCHEMA), header, new HashMap(), RECORD_KEY_FIELD);
        try (ClosableIterator recordIterator = dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, bufferSize);){
            ArrayList<HoodieRecord> retrievedRecords = new ArrayList<HoodieRecord>();
            recordIterator.forEachRemaining(retrievedRecords::add);
            TestHoodieAvroDataBlock.verifyRecords(records, retrievedRecords);
        }
    }

    @Test
    public void testGetRecordIteratorWithEmptyContent() {
        byte[] content = new byte[]{};
        Option contentOption = Option.of((Object)content);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
        HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null, contentOption, false, NULL_BLOCK_CONTENT_LOCATION, Option.of((Object)SCHEMA), header, new HashMap(), RECORD_KEY_FIELD);
        Assertions.assertThrows(EOFException.class, () -> avroDataBlock.deserializeRecords(content, HoodieRecord.HoodieRecordType.AVRO));
        Assertions.assertThrows(HoodieIOException.class, () -> avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO));
    }

    @ParameterizedTest
    @ValueSource(ints={-2147483648, -2, -1, 0, 1, 2, 100, 200, 300, 1024, 0x7FFFFFFF})
    public void testGetRecordIteratorWithEmptyContentAndInputStream(int bufferSize) throws IOException {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SCHEMA.toString());
        HoodieAvroDataBlock avroDataBlock = new HoodieAvroDataBlock(() -> null, Option.empty(), true, NULL_BLOCK_CONTENT_LOCATION, Option.of((Object)SCHEMA), header, new HashMap(), RECORD_KEY_FIELD);
        Assertions.assertThrows(NullPointerException.class, () -> avroDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO, bufferSize));
    }

    private static List<HoodieRecord> generateRandomHoodieRecords(Schema schema, int recordCount) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
        Random random = new Random();
        for (int i = 0; i < recordCount; ++i) {
            GenericData.Record record = new GenericData.Record(schema);
            String recordKey = "key_" + i;
            record.put("_hoodie_record_key", (Object)("key_" + i));
            record.put("value", (Object)random.nextDouble());
            records.add((HoodieRecord)new HoodieAvroIndexedRecord(new HoodieKey(recordKey, ""), (IndexedRecord)record));
        }
        return records;
    }

    private static HoodieAvroDataBlock createHoodieAvroDataBlock(Schema schema, List<HoodieRecord> records) {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        return new HoodieAvroDataBlock(records, header, RECORD_KEY_FIELD);
    }

    private static byte[] createHoodieAvroDataBlockContent(Schema schema, List<HoodieRecord> records) throws IOException {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieStorage storage = (HoodieStorage)Mockito.mock(HoodieStorage.class);
        Mockito.when((Object)storage.getConf()).thenReturn(Mockito.mock(StorageConfiguration.class));
        return new HoodieAvroDataBlock(records, header, RECORD_KEY_FIELD).getContentBytes(storage).toByteArray();
    }

    private static SeekableDataInputStream createSeekableDataInputStream(byte[] content) throws IOException {
        return new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(content));
    }

    private static List<HoodieRecord> selectRandomRecords(List<HoodieRecord> records, boolean fullKey) {
        Set keys = new Random().ints(records.size() / 4, 0, records.size()).mapToObj(records::get).map(r -> r.getRecordKey(SCHEMA, RECORD_KEY_FIELD)).collect(Collectors.toSet());
        return records.stream().filter(r -> fullKey ? keys.contains(r.getRecordKey(SCHEMA, RECORD_KEY_FIELD)) : keys.stream().anyMatch(r.getRecordKey(SCHEMA, RECORD_KEY_FIELD)::startsWith)).collect(Collectors.toList());
    }

    private static void verifyRecords(List<HoodieRecord> expectedRecords, List<HoodieRecord> actualRecords) {
        List<GenericRecord> expectedGenericRecords = TestHoodieAvroDataBlock.convertAndSortRecords(expectedRecords);
        List<GenericRecord> actualGenericRecords = TestHoodieAvroDataBlock.convertAndSortRecords(actualRecords);
        Assertions.assertEquals((int)expectedGenericRecords.size(), (int)actualGenericRecords.size(), (String)"Record count mismatch");
        for (int i = 0; i < expectedGenericRecords.size(); ++i) {
            Assertions.assertEquals(expectedGenericRecords, actualGenericRecords, (String)"Record data mismatch");
        }
    }

    private static List<GenericRecord> convertAndSortRecords(List<HoodieRecord> hoodieRecords) {
        return hoodieRecords.stream().map(HoodieRecord::getData).map(r -> (GenericRecord)r).sorted(Comparator.comparing(r -> r.get(RECORD_KEY_FIELD).toString())).collect(Collectors.toList());
    }
}

