/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer;
import org.apache.hudi.common.table.read.PositionBasedSchemaHandler;
import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestPositionBasedFileGroupRecordBuffer
extends TestHoodieFileGroupReaderOnSpark {
    private final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(57071L);
    private HoodieTableMetaClient metaClient;
    private Schema avroSchema;
    private PositionBasedFileGroupRecordBuffer<InternalRow> buffer;
    private String partitionPath;
    private HoodieReadStats readStats;

    public void prepareBuffer(RecordMergeMode mergeMode, String baseFileInstantTime) throws Exception {
        HashMap<String, String> writeConfigs = new HashMap<String, String>();
        writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
        writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
        writeConfigs.put("hoodie.datasource.write.precombine.field", mergeMode.equals((Object)RecordMergeMode.COMMIT_TIME_ORDERING) ? "" : "timestamp");
        writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
        writeConfigs.put("hoodie.table.name", "hoodie_test");
        writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
        writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
        writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
        writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
        writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
        writeConfigs.put("hoodie.compact.inline", "false");
        writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
        writeConfigs.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), mergeMode.name());
        if (mergeMode.equals((Object)RecordMergeMode.CUSTOM)) {
            writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), this.getCustomPayload());
            writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), "00000000-0000-0000-0000-000000000000");
        }
        this.commitToTable(this.dataGen.generateInserts("001", Integer.valueOf(100)), WriteOperationType.INSERT.value(), writeConfigs);
        String[] partitionPaths = this.dataGen.getPartitionPaths();
        this.partitionPath = partitionPaths[0];
        String[] partitionValues = new String[]{this.partitionPath};
        this.metaClient = HoodieTestUtils.createMetaClient(this.getStorageConf(), (String)this.getBasePath());
        this.avroSchema = new TableSchemaResolver(this.metaClient).getTableAvroSchema();
        Option partitionFields = this.metaClient.getTableConfig().getPartitionFields();
        Option partitionNameOpt = StringUtils.isNullOrEmpty((String)partitionPaths[0]) ? Option.empty() : Option.of((Object)partitionPaths[0]);
        HoodieReaderContext<InternalRow> ctx = this.getHoodieReaderContext(this.getBasePath(), this.avroSchema, this.getStorageConf());
        ctx.setTablePath(this.getBasePath());
        ctx.setLatestCommitTime(this.metaClient.createNewInstantTime());
        ctx.setShouldMergeUseRecordPosition(true);
        ctx.setHasBootstrapBaseFile(false);
        ctx.setHasLogFiles(true);
        ctx.setNeedsBootstrapMerge(false);
        if (mergeMode == RecordMergeMode.CUSTOM) {
            ctx.setRecordMerger(Option.of((Object)new CustomMerger()));
        } else {
            ctx.setRecordMerger(Option.empty());
        }
        ctx.setSchemaHandler((FileGroupReaderSchemaHandler)new PositionBasedSchemaHandler(ctx, this.avroSchema, this.avroSchema, Option.empty(), this.metaClient.getTableConfig(), new TypedProperties()));
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.write.record.merge.mode", (Object)mergeMode.name());
        props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
        props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), this.metaClient.getTempFolderPath());
        props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
        props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), "false");
        if (mergeMode.equals((Object)RecordMergeMode.CUSTOM)) {
            writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), this.getCustomPayload());
            writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), "00000000-0000-0000-0000-000000000000");
        }
        this.readStats = new HoodieReadStats();
        this.buffer = new PositionBasedFileGroupRecordBuffer(ctx, this.metaClient, mergeMode, partitionNameOpt, partitionFields, baseFileInstantTime, props, this.readStats);
    }

    public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean shouldWriteRecordPositions, String baseFileInstantTime) {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, this.avroSchema.toString());
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        if (shouldWriteRecordPositions) {
            header.put(HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS, baseFileInstantTime);
        }
        return header;
    }

    public List<DeleteRecord> getDeleteRecords() throws IOException, URISyntaxException {
        SchemaTestUtil testUtil = new SchemaTestUtil();
        List records = testUtil.generateHoodieTestRecords(0, 100);
        List<DeleteRecord> deletedRecords = records.stream().map(s -> DeleteRecord.create((String)((GenericRecord)s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), (String)((GenericRecord)s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())).collect(Collectors.toList()).subList(0, 50);
        return deletedRecords;
    }

    public HoodieDeleteBlock getDeleteBlockWithPositions(String baseFileInstantTime) throws IOException, URISyntaxException {
        List<DeleteRecord> deletedRecords = this.getDeleteRecords();
        ArrayList<Pair> deleteRecordList = new ArrayList<Pair>();
        long position = 0L;
        for (DeleteRecord dr : deletedRecords) {
            deleteRecordList.add(Pair.of((Object)dr, (Object)position++));
        }
        return new HoodieDeleteBlock(deleteRecordList, this.getHeader(true, baseFileInstantTime));
    }

    public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws IOException, URISyntaxException {
        List<DeleteRecord> deletedRecords = this.getDeleteRecords();
        ArrayList<Pair> deleteRecordList = new ArrayList<Pair>();
        for (DeleteRecord dr : deletedRecords) {
            deleteRecordList.add(Pair.of((Object)dr, (Object)-1L));
        }
        return new HoodieDeleteBlock(deleteRecordList, this.getHeader(false, ""));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testProcessDeleteBlockWithPositions(boolean sameBaseInstantTime) throws Exception {
        String baseFileInstantTime = "090";
        this.prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, baseFileInstantTime);
        HoodieDeleteBlock deleteBlock = this.getDeleteBlockWithPositions(sameBaseInstantTime ? baseFileInstantTime : baseFileInstantTime + "1");
        this.buffer.processDeleteBlock(deleteBlock);
        Assertions.assertEquals((int)50, (int)this.buffer.getLogRecords().size());
        if (sameBaseInstantTime) {
            Assertions.assertNotNull((Object)((BufferedRecord)this.buffer.getLogRecords().get(0L)).getRecordKey(), (String)"the record key is set up for fallback handling");
            Assertions.assertNotNull((Object)((BufferedRecord)this.buffer.getLogRecords().get(0L)).getOrderingValue(), (String)"the ordering value is set up for fallback handling");
        } else {
            Assertions.assertNull(this.buffer.getLogRecords().get(0L));
        }
    }

    @Test
    public void testProcessDeleteBlockWithCustomMerger() throws Exception {
        String baseFileInstantTime = "090";
        this.prepareBuffer(RecordMergeMode.CUSTOM, baseFileInstantTime);
        HoodieDeleteBlock deleteBlock = this.getDeleteBlockWithPositions(baseFileInstantTime);
        this.buffer.processDeleteBlock(deleteBlock);
        Assertions.assertEquals((int)50, (int)this.buffer.getLogRecords().size());
        Assertions.assertNotNull((Object)((BufferedRecord)this.buffer.getLogRecords().get(0L)).getRecordKey());
    }

    @Test
    public void testProcessDeleteBlockWithoutPositions() throws Exception {
        this.prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, "090");
        HoodieDeleteBlock deleteBlock = this.getDeleteBlockWithoutPositions();
        this.buffer.processDeleteBlock(deleteBlock);
        Assertions.assertEquals((int)50, (int)this.buffer.getLogRecords().size());
    }

    public static class CustomMerger
    implements HoodieRecordMerger {
        public String getMergingStrategy() {
            return "random_strategy";
        }

        public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
            throw new IOException("Not implemented");
        }

        public HoodieRecord.HoodieRecordType getRecordType() {
            return HoodieRecord.HoodieRecordType.SPARK;
        }
    }
}

