/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.functional;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HadoopMapRedUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

public class TestHoodieLogFormat
extends HoodieCommonTestHarness {
    private static final HoodieLogBlock.HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
    private static final int BUFFER_SIZE = 4096;
    private static HdfsTestService hdfsTestService;
    private static HoodieStorage storage;
    private StoragePath partitionPath;
    private String spillableBasePath;

    @BeforeAll
    public static void setUpClass() throws IOException {
        if (HoodieTestUtils.shouldUseExternalHdfs()) {
            storage = new HoodieHadoopStorage((FileSystem)HoodieTestUtils.useExternalHdfs());
        } else {
            hdfsTestService = new HdfsTestService();
            storage = new HoodieHadoopStorage((FileSystem)hdfsTestService.start(true).getFileSystem());
        }
    }

    @AfterAll
    public static void tearDownClass() {
        if (hdfsTestService != null) {
            hdfsTestService.stop();
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws IOException, InterruptedException {
        org.apache.hadoop.fs.Path workDir = ((FileSystem)storage.getFileSystem()).getWorkingDirectory();
        this.basePath = new StoragePath(workDir.toString(), testInfo.getDisplayName() + System.currentTimeMillis()).toString();
        this.partitionPath = new StoragePath(this.basePath, "partition_path");
        this.spillableBasePath = new StoragePath(workDir.toString(), ".spillable_path").toString();
        Assertions.assertTrue((boolean)storage.createDirectory(this.partitionPath));
        HoodieTestUtils.init((StorageConfiguration)storage.getConf().newInstance(), (String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
    }

    @AfterEach
    public void tearDown() throws IOException {
        storage.deleteDirectory(new StoragePath(this.basePath));
        storage.deleteDirectory(this.partitionPath);
        storage.deleteDirectory(new StoragePath(this.spillableBasePath));
    }

    @Test
    public void testEmptyLog() throws IOException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Assertions.assertEquals((long)0L, (long)writer.getCurrentSize(), (String)"Just created this log, size should be 0");
        Assertions.assertTrue((boolean)writer.getLogFile().getFileName().startsWith("."), (String)"Check all log files should start with a .");
        Assertions.assertEquals((int)1, (int)writer.getLogFile().getLogVersion(), (String)"Version should be 1 for new log created");
        writer.close();
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    public void testBasicAppend(HoodieLogBlock.HoodieLogBlockType dataBlockType) throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        long pos = writer.getCurrentSize();
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records, header);
        AppendResult result = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size = writer.getCurrentSize();
        Assertions.assertTrue((size > 0L ? 1 : 0) != 0, (String)"We just wrote a block - size should be > 0");
        Assertions.assertEquals((long)size, (long)storage.getPathInfo(writer.getLogFile().getPath()).getLength(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        Assertions.assertEquals((long)size, (long)result.size());
        Assertions.assertEquals((Object)writer.getLogFile(), (Object)result.logFile());
        Assertions.assertEquals((long)0L, (long)result.offset());
        writer.close();
    }

    @Test
    public void testRollover() throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        AppendResult firstAppend = writer.appendBlock((HoodieLogBlock)dataBlock);
        long size = writer.getCurrentSize();
        writer.close();
        Assertions.assertEquals((long)0L, (long)firstAppend.offset());
        Assertions.assertEquals((long)size, (long)firstAppend.size());
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(size - 1L).build();
        records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        AppendResult secondAppend = writer.appendBlock((HoodieLogBlock)dataBlock);
        Assertions.assertEquals((Object)firstAppend.logFile(), (Object)secondAppend.logFile());
        Assertions.assertNotEquals((long)0L, (long)secondAppend.offset());
        Assertions.assertEquals((long)0L, (long)writer.getCurrentSize(), (String)"This should be a new log file and hence size should be 0");
        Assertions.assertEquals((int)2, (int)writer.getLogFile().getLogVersion(), (String)"Version should be rolled to 2");
        StoragePath logFilePath = writer.getLogFile().getPath();
        Assertions.assertFalse((boolean)storage.exists(logFilePath), (String)("Path (" + logFilePath + ") must not exist"));
        records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        AppendResult rolloverAppend = writer.appendBlock((HoodieLogBlock)dataBlock);
        Assertions.assertNotEquals((Object)secondAppend.logFile(), (Object)rolloverAppend.logFile());
        Assertions.assertEquals((long)0L, (long)rolloverAppend.offset());
        writer.close();
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
        this.testConcurrentAppend(true, false);
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
        this.testConcurrentAppend(true, true);
    }

    @Test
    public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
        this.testConcurrentAppend(false, true);
    }

    private void testConcurrentAppend(boolean logFileExists, boolean newLogFileFormat) throws Exception {
        HoodieLogFormat.WriterBuilder builder1 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage);
        HoodieLogFormat.WriterBuilder builder2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage);
        if (newLogFileFormat && logFileExists) {
            builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
            builder2 = builder2.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
        } else if (newLogFileFormat) {
            builder1 = builder1.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken("1-0-1");
            builder2 = builder2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken("1-0-1");
        } else {
            builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
        }
        HoodieLogFormat.Writer writer = builder1.build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        HoodieLogFormat.Writer writer2 = builder2.build();
        writer2.appendBlock((HoodieLogBlock)dataBlock);
        HoodieLogFile logFile1 = writer.getLogFile();
        HoodieLogFile logFile2 = writer2.getLogFile();
        writer.close();
        writer2.close();
        Assertions.assertNotNull((Object)logFile1.getLogWriteToken());
        Assertions.assertEquals((int)logFile1.getLogVersion(), (int)(logFile2.getLogVersion() - 1), (String)"Log Files must have different versions");
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    public void testMultipleAppend(HoodieLogBlock.HoodieLogBlockType dataBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        long size1 = writer.getCurrentSize();
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        long size2 = writer.getCurrentSize();
        Assertions.assertTrue((size2 > size1 ? 1 : 0) != 0, (String)"We just wrote a new block - size2 should be > size1");
        Assertions.assertEquals((long)size2, (long)storage.getPathInfo(writer.getLogFile().getPath()).getLength(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        long size3 = writer.getCurrentSize();
        Assertions.assertTrue((size3 > size2 ? 1 : 0) != 0, (String)"We just wrote a new block - size3 should be > size2");
        Assertions.assertEquals((long)size3, (long)storage.getPathInfo(writer.getLogFile().getPath()).getLength(), (String)"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        writer.close();
        HoodieLogFormat.Writer closedWriter = writer;
        Assertions.assertThrows(IllegalStateException.class, () -> closedWriter.getCurrentSize(), (String)"getCurrentSize should fail after the logAppender is closed");
    }

    @Test
    public void testAppendNotSupported(@TempDir Path tempDir) throws IOException, URISyntaxException, InterruptedException {
        StoragePath localTempDir = new StoragePath(tempDir.toUri().toString());
        HoodieStorage localStorage = HoodieStorageUtils.getStorage((String)localTempDir.toString(), (StorageConfiguration)HoodieTestUtils.getDefaultStorageConf());
        Assertions.assertTrue((boolean)(localStorage.getFileSystem() instanceof LocalFileSystem));
        StoragePath testPath = new StoragePath(localTempDir, "append_test");
        localStorage.createDirectory(testPath);
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)5);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        for (int i = 0; i < 2; ++i) {
            HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath).withFileExtension(".archive").withFileId("commits").overBaseCommit("").withStorage(localStorage).build();
            writer.appendBlock((HoodieLogBlock)dataBlock);
            writer.close();
        }
        List logFileList = localStorage.listDirectEntries(testPath);
        Assertions.assertEquals((int)2, (int)logFileList.size());
    }

    @Test
    public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords = records.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We wrote a block, we should be able to read it");
        HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)DEFAULT_DATA_BLOCK_TYPE, (Object)nextBlock.getBlockType(), (String)"The next block should be a data block");
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
        List<IndexedRecord> recordsRead = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)copyOfRecords.size(), (int)recordsRead.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords, recordsRead, (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.close();
    }

    @Test
    public void testHugeLogFileWrite() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(0xC0000000L).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)1000);
        List copyOfRecords = records.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        byte[] dataBlockContentBytes = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(storage);
        HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(HoodieTestUtils.getStorage((String)this.basePath), null, 0L, (long)dataBlockContentBytes.length, 0L);
        HoodieAvroDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable((Object)dataBlockContentBytes), false, logBlockContentLoc, Option.ofNullable((Object)SchemaTestUtil.getSimpleSchema()), header, new HashMap(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
        long writtenSize = 0L;
        int logBlockWrittenNum = 0;
        while (writtenSize < Integer.MAX_VALUE) {
            AppendResult appendResult = writer.appendBlock((HoodieLogBlock)reusableDataBlock);
            Assertions.assertTrue((appendResult.size() > 0L ? 1 : 0) != 0);
            writtenSize += appendResult.size();
            ++logBlockWrittenNum;
        }
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema(), (boolean)true);
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We wrote a block, we should be able to read it");
        HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)DEFAULT_DATA_BLOCK_TYPE, (Object)nextBlock.getBlockType(), (String)"The next block should be a data block");
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
        List<IndexedRecord> recordsRead = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)copyOfRecords.size(), (int)recordsRead.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords, recordsRead, (String)"Both records lists should be the same. (ordering guaranteed)");
        int logBlockReadNum = 1;
        while (reader.hasNext()) {
            reader.next();
            ++logBlockReadNum;
        }
        Assertions.assertEquals((int)logBlockWrittenNum, (int)logBlockReadNum, (String)"All written log should be correctly found");
        reader.close();
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    public void testBasicAppendAndRead(HoodieLogBlock.HoodieLogBlockType dataBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records1 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records2 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records3 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records3, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
        List<IndexedRecord> recordsRead1 = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)copyOfRecords1.size(), (int)recordsRead1.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords1, recordsRead1, (String)"Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertEquals((Object)dataBlockRead.getSchema(), (Object)SchemaTestUtil.getSimpleSchema());
        reader.hasNext();
        nextBlock = (HoodieLogBlock)reader.next();
        dataBlockRead = (HoodieDataBlock)nextBlock;
        List<IndexedRecord> recordsRead2 = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)copyOfRecords2.size(), (int)recordsRead2.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords2, recordsRead2, (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.hasNext();
        nextBlock = (HoodieLogBlock)reader.next();
        dataBlockRead = (HoodieDataBlock)nextBlock;
        List<IndexedRecord> recordsRead3 = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)copyOfRecords3.size(), (int)recordsRead3.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals(copyOfRecords3, recordsRead3, (String)"Both records lists should be the same. (ordering guaranteed)");
        reader.close();
    }

    @Test
    public void testCDCBlock() throws IOException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        String dataSchemaString = "{\"type\":\"record\",\"name\":\"Record\",\"fields\":[{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}]}";
        Schema dataSchema = new Schema.Parser().parse(dataSchemaString);
        Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode((HoodieCDCSupplementalLoggingMode)HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, (Schema)dataSchema);
        GenericData.Record insertedRecord = new GenericData.Record(dataSchema);
        insertedRecord.put("uuid", (Object)1);
        insertedRecord.put("name", (Object)"apple");
        insertedRecord.put("ts", (Object)1100L);
        GenericData.Record updateBeforeImageRecord = new GenericData.Record(dataSchema);
        updateBeforeImageRecord.put("uuid", (Object)2);
        updateBeforeImageRecord.put("name", (Object)"banana");
        updateBeforeImageRecord.put("ts", (Object)1000L);
        GenericData.Record updateAfterImageRecord = new GenericData.Record(dataSchema);
        updateAfterImageRecord.put("uuid", (Object)2);
        updateAfterImageRecord.put("name", (Object)"blueberry");
        updateAfterImageRecord.put("ts", (Object)1100L);
        GenericData.Record deletedRecord = new GenericData.Record(dataSchema);
        deletedRecord.put("uuid", (Object)3);
        deletedRecord.put("name", (Object)"cherry");
        deletedRecord.put("ts", (Object)1000L);
        GenericData.Record record1 = HoodieCDCUtils.cdcRecord((Schema)cdcSchema, (String)"i", (String)"100", null, (GenericRecord)insertedRecord);
        GenericData.Record record2 = HoodieCDCUtils.cdcRecord((Schema)cdcSchema, (String)"u", (String)"100", (GenericRecord)updateBeforeImageRecord, (GenericRecord)updateAfterImageRecord);
        GenericData.Record record3 = HoodieCDCUtils.cdcRecord((Schema)cdcSchema, (String)"d", (String)"100", (GenericRecord)deletedRecord, null);
        ArrayList<GenericRecord> records = new ArrayList<GenericRecord>(Arrays.asList(record1, record2, record3));
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(HoodieLogBlock.HoodieLogBlockType.CDC_DATA_BLOCK, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)cdcSchema);
        Assertions.assertTrue((boolean)reader.hasNext());
        HoodieLogBlock block = (HoodieLogBlock)reader.next();
        HoodieDataBlock dataBlockRead = (HoodieDataBlock)block;
        List<IndexedRecord> recordsRead = TestHoodieLogFormat.getRecords(dataBlockRead);
        Assertions.assertEquals((int)3, (int)recordsRead.size(), (String)"Read records size should be equal to the written records size");
        Assertions.assertEquals((Object)dataBlockRead.getSchema(), (Object)cdcSchema);
        GenericRecord insert = (GenericRecord)recordsRead.stream().filter(record -> record.get(0).toString().equals("i")).findFirst().get();
        Assertions.assertNull((Object)insert.get("before"));
        Assertions.assertNotNull((Object)insert.get("after"));
        Assertions.assertEquals((Object)((GenericRecord)insert.get("after")).get("name").toString(), (Object)"apple");
        GenericRecord update = (GenericRecord)recordsRead.stream().filter(record -> record.get(0).toString().equals("u")).findFirst().get();
        Assertions.assertNotNull((Object)update.get("before"));
        Assertions.assertNotNull((Object)update.get("after"));
        GenericRecord uBefore = (GenericRecord)update.get("before");
        GenericRecord uAfter = (GenericRecord)update.get("after");
        Assertions.assertEquals((Object)String.valueOf(uBefore.get("name")), (Object)"banana");
        Assertions.assertEquals((Long)Long.valueOf(uBefore.get("ts").toString()), (long)1000L);
        Assertions.assertEquals((Object)String.valueOf(uAfter.get("name")), (Object)"blueberry");
        Assertions.assertEquals((Long)Long.valueOf(uAfter.get("ts").toString()), (long)1100L);
        GenericRecord delete = (GenericRecord)recordsRead.stream().filter(record -> record.get(0).toString().equals("d")).findFirst().get();
        Assertions.assertNotNull((Object)delete.get("before"));
        Assertions.assertNull((Object)delete.get("after"));
        Assertions.assertEquals((Object)((GenericRecord)delete.get("before")).get("name").toString(), (Object)"cherry");
        reader.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List genRecords = testUtil.generateHoodieTestRecords(0, 400);
        Set<HoodieLogFile> logFiles = TestHoodieLogFormat.writeLogFiles(this.partitionPath, schema, genRecords, 4);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())).withReaderSchema(schema).withLatestInstantTime("100").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();
        ArrayList<IndexedRecord> scannedRecords = new ArrayList<IndexedRecord>();
        for (HoodieRecord record : scanner) {
            scannedRecords.add((IndexedRecord)((HoodieAvroRecord)record).getData().getInsertValue(schema).get());
        }
        Assertions.assertEquals(TestHoodieLogFormat.sort(genRecords), TestHoodieLogFormat.sort(scannedRecords), (String)"Scanner records count should be the same as appended records");
        scanner.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List genRecords = testUtil.generateHoodieTestRecords(0, 300);
        Set<HoodieLogFile> logFiles = TestHoodieLogFormat.writeLogFiles(this.partitionPath, schema, genRecords, 3);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())).withReaderSchema(schema).withLatestInstantTime("100").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).withForceFullScan(false).build();
        List<String> sampledRecordKeys = Arrays.asList("b190b1fb-392b-4ceb-932d-a72c906127c2", "409e9ad3-5def-45e7-9180-ef579c1c220b", "e6b31f1c-60a8-4577-acf5-7e8ea318b08b", "0c477a9e-e602-4642-8e96-1cfd357b4ba0", "ea076c17-32ae-4659-8caf-6ad538b4dd8d", "7a943e09-3856-4874-83a1-8ee93e158f94", "9cbff584-d8a4-4b05-868b-dc917d6cf841", "bda0b0d8-0c56-43b0-89f9-e090d924586b", "ee118fb3-69cb-4705-a8c4-88a18e8aa1b7", "cb1fbe4d-06c3-4c9c-aea7-2665ffa8b205");
        List<IndexedRecord> sampledRecords = genRecords.stream().filter(r -> sampledRecordKeys.contains(((GenericRecord)r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())).collect(Collectors.toList());
        scanner.scanByFullKeys(sampledRecordKeys);
        ArrayList<HoodieRecord> scannedHoodieRecords = new ArrayList<HoodieRecord>();
        ArrayList<IndexedRecord> scannedAvroRecords = new ArrayList<IndexedRecord>();
        for (Object record : scanner) {
            scannedHoodieRecords.add((HoodieRecord)record);
            scannedAvroRecords.add((IndexedRecord)((HoodieAvroRecord)record).getData().getInsertValue(schema).get());
        }
        Assertions.assertEquals(TestHoodieLogFormat.sort(sampledRecords), TestHoodieLogFormat.sort(scannedAvroRecords));
        scanner.scanByFullKeys(sampledRecordKeys);
        ArrayList<HoodieRecord> newScannedHoodieRecords = new ArrayList<HoodieRecord>();
        for (HoodieRecord record : scanner) {
            newScannedHoodieRecords.add(record);
        }
        Assertions.assertEquals((int)scannedHoodieRecords.size(), (int)newScannedHoodieRecords.size());
        for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
            Assertions.assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), (String)"Objects have to be identical");
        }
        scanner.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List genRecords = testUtil.generateHoodieTestRecords(0, 300);
        Set<HoodieLogFile> logFiles = TestHoodieLogFormat.writeLogFiles(this.partitionPath, schema, genRecords, 3);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())).withReaderSchema(schema).withLatestInstantTime("100").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).withForceFullScan(false).build();
        List<String> sampledRecordKeys = Arrays.asList("00509b14-3d1a-4283-9a8c-c72b971a9d06", "006b2f57-9bf7-4634-910c-c91542ea61e5", "007fc45d-7ce2-45be-8765-0b9082412518", "00826e50-73b4-4cb0-9d5a-375554d5e0f7");
        List<IndexedRecord> sampledRecords = genRecords.stream().filter(r -> sampledRecordKeys.contains(((GenericRecord)r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())).collect(Collectors.toList());
        List<String> sampledKeyPrefixes = Collections.singletonList("00");
        scanner.scanByKeyPrefixes(sampledKeyPrefixes);
        ArrayList<HoodieRecord> scannedHoodieRecords = new ArrayList<HoodieRecord>();
        ArrayList<IndexedRecord> scannedAvroRecords = new ArrayList<IndexedRecord>();
        for (Object record : scanner) {
            scannedHoodieRecords.add((HoodieRecord)record);
            scannedAvroRecords.add((IndexedRecord)((HoodieAvroRecord)record).getData().getInsertValue(schema).get());
        }
        Assertions.assertEquals(TestHoodieLogFormat.sort(sampledRecords), TestHoodieLogFormat.sort(scannedAvroRecords));
        scanner.scanByKeyPrefixes(sampledKeyPrefixes);
        ArrayList<HoodieRecord> newScannedHoodieRecords = new ArrayList<HoodieRecord>();
        for (HoodieRecord record : scanner) {
            newScannedHoodieRecords.add(record);
        }
        Assertions.assertEquals((int)scannedHoodieRecords.size(), (int)newScannedHoodieRecords.size());
        for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
            Assertions.assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), (String)"Objects have to be identical");
        }
        scanner.close();
    }

    @Test
    public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFile logFile = this.addValidBlock("test-fileId1", "100", 100);
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(logFile.getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(474L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(400L);
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-random"));
        outputStream.flush();
        outputStream.close();
        logFile = this.addValidBlock("test-fileId1", "100", 10);
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)logFile, (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should have corrupted block next");
        HoodieLogBlock block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"There should be no more block left");
        reader.close();
        outputStream = (FSDataOutputStream)storage.append(logFile.getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(500L);
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-else-random"));
        outputStream.flush();
        outputStream.close();
        logFile = this.addValidBlock("test-fileId1", "100", 100);
        reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)logFile, (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the 1st corrupted block next");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the 2nd corrupted block next");
        block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should get the last block next");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"We should have no more blocks left");
        reader.close();
    }

    @Test
    public void testSkipCorruptedCheck() throws Exception {
        HoodieLogFormat.Reader reader1 = this.createCorruptedFile("test-fileid1");
        HoodieLogBlock block = (HoodieLogBlock)reader1.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        reader1.close();
        HoodieLogFormat.Reader reader2 = this.createCorruptedFile("test-fileid2");
        Assertions.assertTrue((boolean)reader2.hasNext(), (String)"We should have corrupted block next");
        Field f1 = reader2.getClass().getDeclaredField("storage");
        f1.setAccessible(true);
        HoodieStorage mockStorage = (HoodieStorage)Mockito.mock(HoodieStorage.class);
        Mockito.when((Object)mockStorage.getScheme()).thenReturn((Object)"gs");
        f1.set(reader2, mockStorage);
        Exception exception = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> reader2.next());
        Assertions.assertTrue((boolean)exception.getMessage().contains("Invalid block byte type found"));
        reader2.close();
    }

    @Test
    public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFile logFile = this.addValidBlock("test-fileId1", "100", 100);
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(logFile.getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.flush();
        outputStream.close();
        logFile = this.addValidBlock("test-fileId1", "100", 10);
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)logFile, (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should have corrupted block next");
        HoodieLogBlock block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"There should be no more block left");
        reader.close();
    }

    private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId(fileId).overBaseCommit(commitTime).withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)numRecords);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        return writer.getLogFile();
    }

    @Test
    public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(474L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(400L);
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-random"));
        long corruptBlockEndPos = outputStream.getPos();
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        records = SchemaTestUtil.generateTestRecords((int)0, (int)10);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"We should have corrupted block next");
        HoodieLogBlock block = (HoodieLogBlock)reader.next();
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, (Object)block.getBlockType(), (String)"The read block should be a corrupt block");
        Assertions.assertEquals((long)corruptBlockEndPos, (long)((HoodieLogBlock.HoodieLogBlockContentLocation)block.getBlockContentLocation().get()).getBlockEndPos());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"Third block should be available");
        reader.next();
        Assertions.assertFalse((boolean)reader.hasNext(), (String)"There should be no more block left");
        reader.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(500L).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        copyOfRecords1.addAll(copyOfRecords2);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
        this.checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, (Option<Set<String>>)Option.of(originalKeys));
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<Object, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        List records3 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        copyOfRecords1.addAll(copyOfRecords3);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
        this.checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, (Option<Set<String>>)Option.of(originalKeys));
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(1);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
        outputStream.writeLong((long)StringUtils.getUTF8Bytes((String)"something-random").length);
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-random"));
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List records3 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"103", (HoodieStorage)storage);
        copyOfRecords1.addAll(copyOfRecords3);
        Set originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
        this.checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, (Option<Set<String>>)Option.of(originalKeys));
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List<DeleteRecord> deletedRecords = copyOfRecords1.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        copyOfRecords2.addAll(copyOfRecords1);
        List originalKeys = copyOfRecords2.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("102").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We still would read 200 records");
        ArrayList readKeys = new ArrayList(200);
        ArrayList emptyPayloads = new ArrayList();
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        scanner.forEach(s -> {
            try {
                if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
                    emptyPayloads.add(true);
                }
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        });
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        Assertions.assertEquals((int)50, (int)emptyPayloads.size(), (String)"Stream collect should return all 50 records with empty payloads");
        originalKeys.removeAll(deletedRecords);
        Collections.sort(originalKeys);
        Collections.sort(readKeys);
        Assertions.assertEquals(originalKeys, readKeys, (String)"CompositeAvroLogReader should return 150 records from 2 versions");
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        FileCreateUtils.deleteDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        readKeys.clear();
        scanner.close();
        scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("103").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        ArrayList newEmptyPayloads = new ArrayList();
        scanner.forEach(s -> {
            try {
                if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
                    newEmptyPayloads.add(true);
                }
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        });
        Assertions.assertEquals((int)100, (int)readKeys.size(), (String)"Stream collect should return 100 records, since 2nd block is rolled back");
        Assertions.assertEquals((int)50, (int)newEmptyPayloads.size(), (String)"Stream collect should return all 50 records with empty payloads");
        List firstBlockRecords = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
        Collections.sort(firstBlockRecords);
        Collections.sort(readKeys);
        Assertions.assertEquals(firstBlockRecords, readKeys, (String)"CompositeAvroLogReader should return 150 records from 2 versions");
        writer.close();
        scanner.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        String fileId = "test-fileid111";
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId(fileId).overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        List allRecordsInserted = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        allRecordsInserted.addAll(copyOfRecords1);
        List deletedKeys = copyOfRecords1.stream().map(s -> new HoodieKey(((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.stream().map(deletedKey -> DeleteRecord.create((String)deletedKey.getRecordKey(), (String)deletedKey.getPartitionPath())).collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)fileId, (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> deleteBlockHeader = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        deleteBlock = new HoodieDeleteBlock(deletedKeys.stream().map(deletedKey -> DeleteRecord.create((String)deletedKey.getRecordKey(), (String)deletedKey.getPartitionPath())).collect(Collectors.toList()).toArray(new DeleteRecord[0]), deleteBlockHeader);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        ArrayList readKeys = new ArrayList();
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("103").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        ArrayList newEmptyPayloads = new ArrayList();
        scanner.forEach(s -> {
            try {
                if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
                    newEmptyPayloads.add(true);
                }
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        });
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        Assertions.assertEquals((int)50, (int)newEmptyPayloads.size(), (String)"Stream collect should return 50 records with empty payloads.");
        List recordKeysInserted = allRecordsInserted.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
        Collections.sort(recordKeysInserted);
        Collections.sort(readKeys);
        Assertions.assertEquals(recordKeysInserted, readKeys, (String)"CompositeAvroLogReader should return 150 records from 2 versions");
        writer.close();
        scanner.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        copyOfRecords1.addAll(copyOfRecords2);
        List originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
        List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock1);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock((DeleteRecord[])copyOfRecords1.subList(10, 20).stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), (Comparable)Integer.valueOf(-1))).toArray(DeleteRecord[]::new), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock2);
        List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20, 30).stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), (Comparable)Integer.valueOf(1))).collect(Collectors.toList());
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
        HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock3);
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"103", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"104", (HoodieStorage)storage);
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("104").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).build();
        Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We still would read 200 records");
        ArrayList readKeys = new ArrayList(200);
        ArrayList emptyPayloadKeys = new ArrayList();
        scanner.forEach(s -> readKeys.add(s.getRecordKey()));
        scanner.forEach(s -> {
            try {
                if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
                    emptyPayloadKeys.add(s.getRecordKey());
                }
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        });
        Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
        Assertions.assertEquals((int)20, (int)emptyPayloadKeys.size(), (String)"Stream collect should return all 20 records with empty payloads");
        originalKeys.removeAll(deleteRecords1.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet()));
        originalKeys.removeAll(deletedRecords3.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet()));
        readKeys.removeAll(emptyPayloadKeys);
        Collections.sort(originalKeys);
        Collections.sort(readKeys);
        Assertions.assertEquals(originalKeys, readKeys, (String)"HoodieMergedLogRecordScanner should return 180 records from 4 versions");
        writer.close();
        scanner.close();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List<DeleteRecord> deleteRecords = copyOfRecords1.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        try {
            writer.appendBlock((HoodieLogBlock)commandBlock);
            throw new Exception("simulating failure");
        }
        catch (Exception exception) {
            writer.appendBlock((HoodieLogBlock)commandBlock);
            writer.close();
            this.checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, (Option<Set<String>>)Option.empty());
            FileCreateUtils.deleteDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
            return;
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List<DeleteRecord> deleteRecords = copyOfRecords1.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        this.checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, (Option<Set<String>>)Option.empty());
        FileCreateUtils.deleteDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        this.checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 100, 100, (Option<Set<String>>)Option.empty());
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        List<DeleteRecord> deleteRecords = copyOfRecords1.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
        writer.appendBlock((HoodieLogBlock)deleteBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        this.checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, (Option<Set<String>>)Option.empty());
    }

    @Disabled(value="HUDI-7375")
    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List<String> deleteKeyListInV2Block = Arrays.asList("d448e1b8-a0d4-45c0-bf2d-a9e16ff3c8ce", "df3f71cd-5b68-406c-bb70-861179444adb", "cf64885c-af32-463b-8f1b-2f31a39b1afa", "9884e134-0d60-46e8-8a1e-36db0e455c4a", "698544b8-defa-4fa7-ac15-8963f7d0784d", "081c279e-fc6a-4e05-89b7-3136e4cad488", "1041fac7-8a54-47e6-8a2d-d1a650301699", "69c003f8-386d-40a0-9c61-5a903d1d6ac2", "e574d164-f8c4-47cf-b150-264c2364f10e", "d76007d2-9dc8-46ff-bf6f-0789c6ffffc0");
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List recordKeyList = testUtil.genRandomUUID(100, deleteKeyListInV2Block);
        List records1 = testUtil.generateHoodieTestRecords(0, recordKeyList, "0000/00/00", "100");
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        byte[] contentBytes = new byte[605];
        InputStream inputStream = TestHoodieLogFormat.class.getResourceAsStream("/format/delete-block-v2-content-10-records.data");
        inputStream.read(contentBytes);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        writer.appendBlock((HoodieLogBlock)new HoodieDeleteBlock(Option.of((Object)contentBytes), null, true, Option.empty(), header, Collections.EMPTY_MAP));
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List<DeleteRecord> deletedRecords = copyOfRecords2.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 60);
        writer.appendBlock((HoodieLogBlock)new HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[0]), header));
        copyOfRecords2.addAll(copyOfRecords1);
        List originalKeys = copyOfRecords2.stream().map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"103", (HoodieStorage)storage);
        try (HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("103").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();){
            Assertions.assertEquals((long)200L, (long)scanner.getTotalLogRecords(), (String)"We still would read 200 records");
            ArrayList readKeys = new ArrayList(200);
            ArrayList recordKeys = new ArrayList(200);
            ArrayList emptyPayloads = new ArrayList();
            scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
            scanner.forEach(s -> {
                try {
                    if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema, new Properties()).isPresent()) {
                        emptyPayloads.add(true);
                    } else {
                        recordKeys.add(s.getKey().getRecordKey());
                    }
                }
                catch (IOException io) {
                    throw new UncheckedIOException(io);
                }
            });
            Assertions.assertEquals((int)200, (int)readKeys.size(), (String)"Stream collect should return all 200 records");
            Assertions.assertEquals((int)70, (int)emptyPayloads.size(), (String)"Stream collect should return all 70 records with empty payloads");
            Collections.sort(originalKeys);
            Collections.sort(readKeys);
            Assertions.assertEquals(originalKeys, readKeys, (String)"200 records should be scanned regardless of deletes or not");
            originalKeys.removeAll(deleteKeyListInV2Block);
            originalKeys.removeAll(deletedRecords.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toList()));
            Collections.sort(originalKeys);
            Collections.sort(recordKeys);
            Assertions.assertEquals(originalKeys, recordKeys, (String)"Only 130 records should exist after deletion");
        }
    }

    @Test
    public void testAvroLogRecordReaderWithRollbackOlderBlocks() throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        List records2 = testUtil.generateHoodieTestRecords(100, 10);
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        this.checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 110, 110, (Option<Set<String>>)Option.empty());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        this.checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 10, 10, (Option<Set<String>>)Option.empty());
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        this.checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, (Option<Set<String>>)Option.empty());
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
        writer.appendBlock((HoodieLogBlock)commandBlock);
        writer.close();
        this.checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, (Option<Set<String>>)Option.empty());
        FileCreateUtils.deleteDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
    }

    @ParameterizedTest
    @MethodSource(value={"testArgumentsWithoutOptimizedScanArg"})
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException, InterruptedException {
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records1 = testUtil.generateHoodieTestRecords(0, 100);
        Set recordKeysOfFirstTwoBatches = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
        HashMap<Object, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records1), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        List records2 = testUtil.generateHoodieTestRecords(0, 100);
        recordKeysOfFirstTwoBatches.addAll(records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records2), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"101", (HoodieStorage)storage);
        List records3 = testUtil.generateHoodieTestRecords(0, 100);
        HashSet recordKeysOfFirstThreeBatches = new HashSet(recordKeysOfFirstTwoBatches);
        recordKeysOfFirstThreeBatches.addAll(records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records3), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"102", (HoodieStorage)storage);
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(1000L);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1);
        outputStream.writeLong(100L);
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List compactedRecords = Stream.of(records1, records2).flatMap(Collection::stream).collect(Collectors.toList());
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "100,101");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(compactedRecords), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"103", (HoodieStorage)storage);
        List secondCompactedRecords = Stream.of(compactedRecords, records3).flatMap(Collection::stream).collect(Collectors.toList());
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
        header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "103,102");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(secondCompactedRecords), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"104", (HoodieStorage)storage);
        List records6 = testUtil.generateHoodieTestRecords(0, 100);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "105");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records6), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"105", (HoodieStorage)storage);
        List records7 = testUtil.generateHoodieTestRecords(0, 100);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "106");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records7), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"106", (HoodieStorage)storage);
        List records8 = testUtil.generateHoodieTestRecords(0, 100);
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "107");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(records8), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"107", (HoodieStorage)storage);
        List thirdCompactedBlockRecords = Stream.of(records7, records8).flatMap(Collection::stream).collect(Collectors.toList());
        header = new HashMap();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "108");
        header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "106,107");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList<IndexedRecord>(thirdCompactedBlockRecords), header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"108", (HoodieStorage)storage);
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("108").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(true).build();
        Assertions.assertEquals((long)600L, (long)scanner.getTotalLogRecords(), (String)"We would read 600 records from scanner");
        ArrayList readKeys = new ArrayList();
        scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
        List expectedRecords = Stream.of(secondCompactedRecords, records6, thirdCompactedBlockRecords).flatMap(Collection::stream).map(s -> ((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).sorted().collect(Collectors.toList());
        List validBlockInstants = scanner.getValidBlockInstants();
        List<String> expectedBlockInstants = Arrays.asList("108", "105", "104");
        Assertions.assertEquals(expectedBlockInstants, (Object)validBlockInstants);
        Collections.sort(readKeys);
        Assertions.assertEquals(expectedRecords, readKeys, (String)"Record keys read should be exactly same.");
        scanner.close();
    }

    private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2, ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) {
        try {
            Schema schema = HoodieAvroUtils.addMetadataFields((Schema)SchemaTestUtil.getSimpleSchema());
            SchemaTestUtil testUtil = new SchemaTestUtil();
            List records = testUtil.generateHoodieTestRecords(0, 101);
            ArrayList records2 = new ArrayList(records);
            HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records.subList(0, numRecordsInLog1), header);
            writer.appendBlock((HoodieLogBlock)dataBlock);
            long size = writer.getCurrentSize();
            writer.close();
            HoodieLogFormat.Writer writer2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(size - 1L).build();
            HashMap<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            HoodieDataBlock dataBlock2 = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2.subList(0, numRecordsInLog2), header2);
            writer2.appendBlock((HoodieLogBlock)dataBlock2);
            writer2.close();
            FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
            List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
            HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime("100").withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan).build();
            Assertions.assertEquals((long)Math.max(numRecordsInLog1, numRecordsInLog2), (long)scanner.getNumMergedRecordsInLog(), (String)"We would read 100 records");
            scanner.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan) {
        this.testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
    }

    @Test
    public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records1 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records2 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records3 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), storage.getPathInfo(writer.getLogFile().getPath()).getLength());
        try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, logFile, SchemaTestUtil.getSimpleSchema(), 4096, true);){
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
            HoodieLogBlock prevBlock = reader.prev();
            HoodieDataBlock dataBlockRead = (HoodieDataBlock)prevBlock;
            List<IndexedRecord> recordsRead1 = TestHoodieLogFormat.getRecords(dataBlockRead);
            Assertions.assertEquals((int)copyOfRecords3.size(), (int)recordsRead1.size(), (String)"Third records size should be equal to the written records size");
            Assertions.assertEquals(copyOfRecords3, recordsRead1, (String)"Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Second block should be available");
            prevBlock = reader.prev();
            dataBlockRead = (HoodieDataBlock)prevBlock;
            List<IndexedRecord> recordsRead2 = TestHoodieLogFormat.getRecords(dataBlockRead);
            Assertions.assertEquals((int)copyOfRecords2.size(), (int)recordsRead2.size(), (String)"Read records size should be equal to the written records size");
            Assertions.assertEquals(copyOfRecords2, recordsRead2, (String)"Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"First block should be available");
            prevBlock = reader.prev();
            dataBlockRead = (HoodieDataBlock)prevBlock;
            List<IndexedRecord> recordsRead3 = TestHoodieLogFormat.getRecords(dataBlockRead);
            Assertions.assertEquals((int)copyOfRecords1.size(), (int)recordsRead3.size(), (String)"Read records size should be equal to the written records size");
            Assertions.assertEquals(copyOfRecords1, recordsRead3, (String)"Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertFalse((boolean)reader.hasPrev());
        }
    }

    @Test
    public void testAppendAndReadOnCorruptedLogInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        outputStream.writeInt(1000);
        outputStream.writeInt(1);
        outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-random"));
        outputStream.flush();
        outputStream.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), storage.getPathInfo(writer.getLogFile().getPath()).getLength());
        try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, logFile, schema, 4096, true);){
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
            HoodieLogBlock block = reader.prev();
            Assertions.assertTrue((boolean)(block instanceof HoodieDataBlock), (String)"Last block should be datablock");
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Last block should be available");
            Assertions.assertThrows(CorruptedLogFileException.class, () -> reader.prev());
        }
    }

    @Test
    public void testBasicAppendAndTraverseInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records1 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        List copyOfRecords1 = records1.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)record), (Schema)schema)).collect(Collectors.toList());
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records2 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records3 = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"100", (HoodieStorage)storage);
        HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), storage.getPathInfo(writer.getLogFile().getPath()).getLength());
        try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, logFile, SchemaTestUtil.getSimpleSchema(), 4096, true);){
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Third block should be available");
            reader.moveToPrev();
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"Second block should be available");
            reader.moveToPrev();
            Assertions.assertTrue((boolean)reader.hasPrev(), (String)"First block should be available");
            HoodieLogBlock prevBlock = reader.prev();
            HoodieDataBlock dataBlockRead = (HoodieDataBlock)prevBlock;
            List<IndexedRecord> recordsRead = TestHoodieLogFormat.getRecords(dataBlockRead);
            Assertions.assertEquals((int)copyOfRecords1.size(), (int)recordsRead.size(), (String)"Read records size should be equal to the written records size");
            Assertions.assertEquals(copyOfRecords1, recordsRead, (String)"Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertFalse((boolean)reader.hasPrev());
        }
    }

    @Test
    public void testV0Format() throws IOException, URISyntaxException {
        int i;
        Schema schema = SchemaTestUtil.getSimpleSchema();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        ArrayList recordsCopy = new ArrayList(records);
        Assertions.assertEquals((int)100, (int)records.size());
        Assertions.assertEquals((int)100, (int)recordsCopy.size());
        HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), schema);
        byte[] content = dataBlock.getBytes(schema);
        Assertions.assertTrue((content.length > 0 ? 1 : 0) != 0);
        HoodieAvroDataBlock logBlock = HoodieAvroDataBlock.getBlock((byte[])content, (Schema)schema);
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, (Object)logBlock.getBlockType());
        List<IndexedRecord> readRecords = TestHoodieLogFormat.getRecords((HoodieDataBlock)logBlock);
        Assertions.assertEquals((int)readRecords.size(), (int)recordsCopy.size());
        for (i = 0; i < recordsCopy.size(); ++i) {
            Assertions.assertEquals(recordsCopy.get(i), (Object)readRecords.get(i));
        }
        logBlock = HoodieAvroDataBlock.getBlock((byte[])content, null);
        Assertions.assertEquals((Object)HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, (Object)logBlock.getBlockType());
        readRecords = TestHoodieLogFormat.getRecords((HoodieDataBlock)logBlock);
        Assertions.assertEquals((int)readRecords.size(), (int)recordsCopy.size());
        for (i = 0; i < recordsCopy.size(); ++i) {
            Assertions.assertEquals(recordsCopy.get(i), (Object)readRecords.get(i));
        }
    }

    @ParameterizedTest
    @EnumSource(names={"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    public void testDataBlockFormatAppendAndReadWithProjectedSchema(HoodieLogBlock.HoodieLogBlockType dataBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestGenericRecords((int)0, (int)1000);
        final Schema schema = SchemaTestUtil.getSimpleSchema();
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>(){
            {
                this.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
                this.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
            }
        };
        BenchmarkCounter.initCounterFromReporter((Reporter)HadoopMapRedUtils.createTestReporter(), (Configuration)((FileSystem)storage.getFileSystem()).getConf());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(dataBlockType, records, (Map<HoodieLogBlock.HeaderMetadataType, String>)header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        Schema projectedSchema = HoodieAvroUtils.generateProjectionSchema((Schema)schema, Collections.singletonList("name"));
        List projectedRecords = HoodieAvroUtils.rewriteRecords((List)records, (Schema)projectedSchema);
        try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)projectedSchema, (boolean)false);){
            Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
            HoodieLogBlock nextBlock = (HoodieLogBlock)reader.next();
            HoodieDataBlock dataBlockRead = (HoodieDataBlock)nextBlock;
            HashMap<HoodieLogBlock.HoodieLogBlockType, Integer> expectedReadBytes = new HashMap<HoodieLogBlock.HoodieLogBlockType, Integer>(){
                {
                    this.put(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, 0);
                    this.put(HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK, 0);
                    this.put(HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? (HoodieTestUtils.getJavaVersion() == 17 || HoodieTestUtils.getJavaVersion() == 11 ? 1803 : 1802) : 1809);
                }
            };
            List<IndexedRecord> recordsRead = TestHoodieLogFormat.getRecords(dataBlockRead);
            Assertions.assertEquals((int)projectedRecords.size(), (int)recordsRead.size(), (String)"Read records size should be equal to the written records size");
            Assertions.assertEquals((Object)projectedRecords, recordsRead, (String)"Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertEquals((Object)dataBlockRead.getSchema(), (Object)projectedSchema);
            int bytesRead = (int)BenchmarkCounter.getBytesRead();
            Assertions.assertEquals((Integer)((Integer)expectedReadBytes.get(dataBlockType)), (int)bytesRead, (String)"Read bytes have to match");
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testGetRecordPositions(boolean addRecordPositionsHeader) throws IOException {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        ArrayList<Long> positions = new ArrayList();
        if (addRecordPositionsHeader) {
            positions = TestLogReaderUtils.generatePositions();
            String content = LogReaderUtils.encodePositions(positions);
            header.put(HoodieLogBlock.HeaderMetadataType.RECORD_POSITIONS, content);
        }
        HoodieDeleteBlock logBlock = new HoodieDeleteBlock(new DeleteRecord[0], header);
        if (addRecordPositionsHeader) {
            TestLogReaderUtils.assertPositionEquals(positions, logBlock.getRecordPositions());
        }
    }

    public static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType dataBlockType, List<IndexedRecord> records, Map<HoodieLogBlock.HeaderMetadataType, String> header) {
        return TestHoodieLogFormat.getDataBlock(dataBlockType, records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), header, new StoragePath("dummy_path"));
    }

    private static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType dataBlockType, List<HoodieRecord> records, Map<HoodieLogBlock.HeaderMetadataType, String> header, StoragePath pathForReader) {
        switch (dataBlockType) {
            case CDC_DATA_BLOCK: {
                return new HoodieCDCDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
            }
            case AVRO_DATA_BLOCK: {
                return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
            }
            case HFILE_DATA_BLOCK: {
                return new HoodieHFileDataBlock(records, header, (String)HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, ((Boolean)HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()).booleanValue());
            }
            case PARQUET_DATA_BLOCK: {
                return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, (String)HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
            }
        }
        throw new RuntimeException("Unknown data block type " + dataBlockType);
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, true}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, true}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true}));
    }

    private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true}));
    }

    private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath, Schema schema, List<IndexedRecord> records, int numFiles) throws IOException, InterruptedException {
        return TestHoodieLogFormat.writeLogFiles(partitionPath, schema, records, numFiles, false);
    }

    private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath, Schema schema, List<IndexedRecord> records, int numFiles, boolean enableBlockSequenceNumbers) throws IOException, InterruptedException {
        boolean blockSeqNo = false;
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(".log").withSizeThreshold(1024L).withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HashSet<HoodieLogFile> logFiles = new HashSet<HoodieLogFile>();
        int recordsPerFile = records.size() / numFiles;
        for (int filesWritten = 0; filesWritten < numFiles; ++filesWritten) {
            int targetRecordsCount = filesWritten == numFiles - 1 ? recordsPerFile + records.size() % recordsPerFile : recordsPerFile;
            int offset = filesWritten * recordsPerFile;
            List<IndexedRecord> targetRecords = records.subList(offset, offset + targetRecordsCount);
            logFiles.add(writer.getLogFile());
            writer.appendBlock((HoodieLogBlock)TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, header));
        }
        writer.close();
        return logFiles;
    }

    private static List<IndexedRecord> getRecords(HoodieDataBlock dataBlock) {
        ClosableIterator itr = dataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);
        ArrayList<IndexedRecord> elements = new ArrayList<IndexedRecord>();
        itr.forEachRemaining(r -> elements.add((IndexedRecord)r.getData()));
        return elements;
    }

    private static List<IndexedRecord> sort(List<IndexedRecord> records) {
        ArrayList<IndexedRecord> copy = new ArrayList<IndexedRecord>(records);
        copy.sort(Comparator.comparing(r -> ((GenericRecord)r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
        return copy;
    }

    private HoodieLogFormat.Reader createCorruptedFile(String fileId) throws Exception {
        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId(fileId).overBaseCommit("100").withStorage(storage).build();
        List records = SchemaTestUtil.generateTestRecords((int)0, (int)100);
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = TestHoodieLogFormat.getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
        writer.appendBlock((HoodieLogBlock)dataBlock);
        writer.close();
        FSDataOutputStream outputStream = (FSDataOutputStream)storage.append(writer.getLogFile().getPath());
        outputStream.write(HoodieLogFormat.MAGIC);
        outputStream.writeLong(473L);
        outputStream.writeInt(1);
        outputStream.writeInt(10000);
        outputStream.writeLong(400L);
        outputStream.write(StringUtils.getUTF8Bytes((String)"something-random"));
        outputStream.flush();
        outputStream.close();
        HoodieLogFormat.Reader reader = HoodieLogFormat.newReader((HoodieStorage)storage, (HoodieLogFile)writer.getLogFile(), (Schema)SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue((boolean)reader.hasNext(), (String)"First block should be available");
        reader.next();
        return reader;
    }

    private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int expectedTotalRecords, int expectedTotalKeys, Option<Set<String>> expectedKeys) throws IOException {
        List allLogFiles = FSUtils.getAllLogFiles((HoodieStorage)storage, (StoragePath)this.partitionPath, (String)"test-fileid1", (String)".log", (String)"100").map(s -> s.getPath().toString()).collect(Collectors.toList());
        HoodieMergedLogRecordScanner.Builder builder = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(allLogFiles).withReaderSchema(schema).withLatestInstantTime(latestInstantTime).withMaxMemorySizeInBytes(Long.valueOf(10240L)).withReverseReader(false).withBufferSize(4096).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(isCompressionEnabled).withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
        try (HoodieMergedLogRecordScanner scanner = builder.build();){
            Assertions.assertEquals((long)expectedTotalRecords, (long)scanner.getTotalLogRecords(), (String)("There should be " + expectedTotalRecords + " records"));
            HashSet readKeys = new HashSet();
            scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
            Assertions.assertEquals((int)expectedTotalKeys, (int)readKeys.size(), (String)("Read should return return all " + expectedTotalKeys + " keys"));
            if (expectedKeys.isPresent()) {
                Assertions.assertEquals((Object)expectedKeys.get(), readKeys, (String)"Keys read from log file should match written keys");
            }
        }
    }
}

