/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCopyOnWriteActionExecutor
extends HoodieClientTestBase
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(TestCopyOnWriteActionExecutor.class);
    private static final Schema SCHEMA = SchemaTestUtil.getSchemaFromResource(TestCopyOnWriteActionExecutor.class, (String)"/exampleSchema.avsc");

    private static final Stream<Arguments> indexType() {
        HoodieIndex.IndexType[] data = new HoodieIndex.IndexType[]{HoodieIndex.IndexType.BLOOM, HoodieIndex.IndexType.BUCKET};
        return Stream.of(data).map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    @Test
    public void testMakeNewPath() {
        String fileName = UUID.randomUUID().toString();
        String partitionPath = "2016/05/04";
        String instantTime = HoodieTestTable.makeNewCommitTime();
        HoodieWriteConfig config = this.makeHoodieClientConfig();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        Pair newPathWithWriteToken = (Pair)this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> this.lambda$testMakeNewPath$d4eb593f$1(partitionPath, config, instantTime, (HoodieTable)table, fileName, arg_0)).collect().get(0);
        Assertions.assertEquals((Object)((StoragePath)newPathWithWriteToken.getKey()).toString(), (Object)Paths.get(this.basePath, partitionPath, FSUtils.makeBaseFileName((String)instantTime, (String)((String)newPathWithWriteToken.getRight()), (String)fileName, (String)BASE_FILE_EXTENSION)).toString());
    }

    private HoodieWriteConfig makeHoodieClientConfig() {
        return this.makeHoodieClientConfigBuilder().build();
    }

    private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema(SCHEMA.toString()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build());
    }

    private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
        Properties props = new Properties();
        HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder().withIndexType(indexType);
        if (indexType.equals((Object)HoodieIndex.IndexType.BUCKET)) {
            props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
            indexConfig.fromProperties(props).withIndexKeyField("_row_key").withBucketNum("1").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
            props.putAll((Map<?, ?>)HoodieLayoutConfig.newBuilder().fromProperties(props).withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
        }
        props.putAll((Map<?, ?>)indexConfig.build().getProps());
        return props;
    }

    @ParameterizedTest
    @MethodSource(value={"indexType"})
    public void testUpdateRecords(HoodieIndex.IndexType indexType) throws Exception {
        GenericRecord genericRecord;
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withProps((Map)this.makeIndexConfig(indexType)).build();
        String firstCommitTime = HoodieTestTable.makeNewCommitTime();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);
        writeClient.startCommitWithTime(firstCommitTime);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        String partitionPath = "2016/01/31";
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
        String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
        String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
        String recordStr4 = "{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}";
        ArrayList<HoodieAvroRecord> records = new ArrayList<HoodieAvroRecord>();
        RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), (HoodieRecordPayload)rowChange1));
        RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), (HoodieRecordPayload)rowChange2));
        RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), (HoodieRecordPayload)rowChange3));
        HoodieSparkCopyOnWriteTable cowTable = table;
        writeClient.insert(this.jsc.parallelize(records, 1), firstCommitTime);
        FileStatus[] allFiles = this.getIncrementalFiles(partitionPath, "0", -1);
        Assertions.assertEquals((int)1, (int)allFiles.length);
        Path filePath = allFiles[0].getPath();
        BloomFilter filter = HoodieIOFactory.getIOFactory((HoodieStorage)this.storage).getFileFormatUtils(table.getBaseFileFormat()).readBloomFilterFromMetadata(this.storage, new StoragePath(filePath.toUri()));
        for (HoodieRecord hoodieRecord : records) {
            Assertions.assertTrue((boolean)filter.mightContain(hoodieRecord.getRecordKey()));
        }
        List fileRecords = HoodieIOFactory.getIOFactory((HoodieStorage)this.storage).getFileFormatUtils(table.getBaseFileFormat()).readAvroRecords(this.storage, new StoragePath(filePath.toUri()));
        int index = 0;
        for (GenericRecord record : fileRecords) {
            Assertions.assertEquals((Object)((HoodieRecord)records.get(index)).getRecordKey(), (Object)record.get("_row_key").toString());
            ++index;
        }
        String updateRecordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
        RawTripTestPayload updateRowChanges1 = new RawTripTestPayload(updateRecordStr1);
        HoodieAvroRecord updatedRecord1 = new HoodieAvroRecord(new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), (HoodieRecordPayload)updateRowChanges1);
        RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4);
        HoodieAvroRecord insertedRecord1 = new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), (HoodieRecordPayload)rowChange4);
        List<HoodieRecord> updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1);
        Thread.sleep(1000L);
        String newCommitTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        writeClient.startCommitWithTime(newCommitTime);
        List statuses = writeClient.upsert(this.jsc.parallelize(updatedRecords), newCommitTime).collect();
        allFiles = this.getIncrementalFiles(partitionPath, firstCommitTime, -1);
        Assertions.assertEquals((int)1, (int)allFiles.length);
        Assertions.assertEquals((Object)FSUtils.getFileId((String)filePath.getName()), (Object)FSUtils.getFileId((String)allFiles[0].getPath().getName()));
        Path updatedFilePath = allFiles[0].getPath();
        BloomFilter updatedFilter = HoodieIOFactory.getIOFactory((HoodieStorage)this.metaClient.getStorage()).getFileFormatUtils(this.metaClient.getTableConfig().getBaseFileFormat()).readBloomFilterFromMetadata(this.storage, new StoragePath(updatedFilePath.toUri()));
        for (HoodieRecord hoodieRecord : records) {
            Assertions.assertTrue((boolean)updatedFilter.mightContain(hoodieRecord.getRecordKey()));
        }
        Assertions.assertTrue((boolean)updatedFilter.mightContain(insertedRecord1.getRecordKey()));
        records.add(insertedRecord1);
        ParquetReader updatedReader = ParquetReader.builder((ReadSupport)new AvroReadSupport(), (Path)updatedFilePath).build();
        index = 0;
        while ((genericRecord = (GenericRecord)updatedReader.read()) != null) {
            Assertions.assertEquals((Object)genericRecord.get("_row_key").toString(), (Object)((HoodieRecord)records.get(index)).getRecordKey());
            if (index == 0) {
                Assertions.assertEquals((Object)"15", (Object)genericRecord.get("number").toString());
            }
            ++index;
        }
        updatedReader.close();
        WriteStatus writeStatus = (WriteStatus)statuses.get(0);
        Assertions.assertEquals((int)1, (int)statuses.size(), (String)"Should be only one file generated");
        Assertions.assertEquals((long)4L, (long)writeStatus.getStat().getNumWrites());
    }

    private FileStatus[] getIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) throws Exception {
        HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat();
        JobConf jobConf = new JobConf((Configuration)this.storageConf.unwrap());
        hoodieInputFormat.setConf((Configuration)jobConf);
        HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE);
        this.setupIncremental(jobConf, startCommitTime, numCommitsToPull);
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)Paths.get(this.basePath, partitionPath).toString());
        return hoodieInputFormat.listStatus(jobConf);
    }

    private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
        String modePropertyName = String.format("hoodie.%s.consume.mode", "raw_trips");
        jobConf.set(modePropertyName, "INCREMENTAL");
        String startCommitTimestampName = String.format("hoodie.%s.consume.start.timestamp", "raw_trips");
        jobConf.set(startCommitTimestampName, startCommit);
        String maxCommitPulls = String.format("hoodie.%s.consume.max.commits", "raw_trips");
        jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
    }

    private List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
        for (int i = 0; i < n; ++i) {
            String recordStr = String.format("{\"_row_key\":\"%s\",\"time\":\"%s\",\"number\":%d}", UUID.randomUUID().toString(), time, i);
            RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
            records.add((HoodieRecord)new HoodieAvroRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), (HoodieRecordPayload)rowChange));
        }
        return records;
    }

    @Test
    public void testMetadataAggregateFromWriteStatus() throws Exception {
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
        String firstCommitTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
        String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
        String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
        ArrayList<HoodieAvroRecord> records = new ArrayList<HoodieAvroRecord>();
        RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), (HoodieRecordPayload)rowChange1));
        RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), (HoodieRecordPayload)rowChange2));
        RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
        records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), (HoodieRecordPayload)rowChange3));
        SparkInsertCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(this.context, config, (HoodieTable)table, firstCommitTime, this.context.parallelize(records));
        List writeStatuses = this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testMetadataAggregateFromWriteStatus$d2d6cc3$1((BaseSparkCommitActionExecutor)actionExecutor, records, arg_0)).flatMap(Transformations::flattenAsIterator).collect();
        Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses((List)writeStatuses);
        Assertions.assertTrue((boolean)allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
        Assertions.assertEquals((Object)"6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
    }

    private void verifyStatusResult(List<WriteStatus> statuses, Map<String, Long> expectedPartitionNumRecords) {
        HashMap<String, Long> actualPartitionNumRecords = new HashMap<String, Long>();
        for (int i = 0; i < statuses.size(); ++i) {
            WriteStatus writeStatus = statuses.get(i);
            String partitionPath = writeStatus.getPartitionPath();
            actualPartitionNumRecords.put(partitionPath, actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + writeStatus.getTotalRecords());
            Assertions.assertEquals((int)0, (int)writeStatus.getFailedRecords().size());
        }
        Assertions.assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
    }

    @Test
    public void testInsertRecords() throws Exception {
        HoodieWriteConfig config = this.makeHoodieClientConfig();
        String instantTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        List<HoodieRecord> records = this.newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
        records.addAll(this.newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
        List<HoodieRecord> recs2 = records;
        SparkInsertPreppedCommitActionExecutor actionExecutor = new SparkInsertPreppedCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, this.context.parallelize(recs2));
        List returnedStatuses = this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testInsertRecords$58cec836$1((BaseSparkCommitActionExecutor)actionExecutor, recs2, arg_0)).flatMap(Transformations::flattenAsIterator).collect();
        Assertions.assertEquals((int)2, (int)returnedStatuses.size());
        HashMap<String, Long> expectedPartitionNumRecords = new HashMap<String, Long>();
        expectedPartitionNumRecords.put("2016/01/31", 10L);
        expectedPartitionNumRecords.put("2016/02/01", 1L);
        this.verifyStatusResult(returnedStatuses, expectedPartitionNumRecords);
        records = this.newHoodieRecords(1, "2016-01-31T03:16:41.415Z");
        records.addAll(this.newHoodieRecords(5, "2016-02-01T03:16:41.415Z"));
        records.addAll(this.newHoodieRecords(1, "2016-02-02T03:16:41.415Z"));
        List<HoodieRecord> recs3 = records;
        SparkUpsertPreppedCommitActionExecutor newActionExecutor = new SparkUpsertPreppedCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, this.context.parallelize(recs3));
        returnedStatuses = this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testInsertRecords$5c7863df$1((BaseSparkCommitActionExecutor)newActionExecutor, recs3, arg_0)).flatMap(Transformations::flattenAsIterator).collect();
        Assertions.assertEquals((int)3, (int)returnedStatuses.size());
        expectedPartitionNumRecords.clear();
        expectedPartitionNumRecords.put("2016/01/31", 1L);
        expectedPartitionNumRecords.put("2016/02/01", 5L);
        expectedPartitionNumRecords.put("2016/02/02", 1L);
        this.verifyStatusResult(returnedStatuses, expectedPartitionNumRecords);
    }

    @Test
    public void testFileSizeUpsertRecords() throws Exception {
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(65536L).hfileMaxFileSize(65536L).parquetBlockSize(65536).parquetPageSize(65536).build()).build();
        String instantTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        ArrayList<HoodieAvroRecord> records = new ArrayList<HoodieAvroRecord>();
        for (int i = 0; i < 2050; ++i) {
            String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
            RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
            records.add(new HoodieAvroRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), (HoodieRecordPayload)rowChange));
        }
        SparkUpsertCommitActionExecutor actionExecutor = new SparkUpsertCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, this.context.parallelize(records));
        this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testFileSizeUpsertRecords$a0c2d401$1((BaseSparkCommitActionExecutor)actionExecutor, records, arg_0)).map(Transformations::flatten).collect();
        int counts = 0;
        for (File file : Paths.get(this.basePath, "2016/01/31").toFile().listFiles()) {
            if (!file.getName().endsWith(table.getBaseFileExtension()) || !FSUtils.getCommitTime((String)file.getName()).equals(instantTime)) continue;
            LOG.info(file.getName() + "-" + file.length());
            ++counts;
        }
        Assertions.assertEquals((int)3, (int)counts, (String)"If the number of records are more than 1150, then there should be a new file");
    }

    @Test
    public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024000L).hfileMaxFileSize(1024000L).build()).build();
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String instantTime = "000";
        List inserts = this.dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
        SparkInsertCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, this.context.parallelize(inserts));
        List ws = this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testInsertUpsertWithHoodieAvroPayload$7e0db4b8$1((BaseSparkCommitActionExecutor)actionExecutor, inserts, arg_0)).map(Transformations::flatten).collect();
        WriteStatus writeStatus = (WriteStatus)((List)ws.get(0)).get(0);
        String fileId = writeStatus.getFileId();
        this.metaClient.getStorage().create(new StoragePath(Paths.get(this.basePath, ".hoodie", "000.commit").toString())).close();
        List updates = this.dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts);
        String partitionPath = writeStatus.getPartitionPath();
        long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
        table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient));
        SparkUpsertCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, this.context.parallelize(updates));
        List updateStatus = this.jsc.parallelize(Arrays.asList(1)).map(arg_0 -> TestCopyOnWriteActionExecutor.lambda$testInsertUpsertWithHoodieAvroPayload$21b27613$1((BaseSparkCommitActionExecutor)newActionExecutor, partitionPath, fileId, updates, arg_0)).map(Transformations::flatten).collect();
        Assertions.assertEquals((long)((long)updates.size() - numRecordsInPartition), (long)((WriteStatus)((List)updateStatus.get(0)).get(0)).getTotalErrorRecords());
    }

    private void testBulkInsertRecords(String bulkInsertMode) {
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build();
        String instantTime = HoodieTestTable.makeNewCommitTime();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);
        writeClient.startCommitWithTime(instantTime);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        JavaRDD<HoodieRecord> inputRecords = TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc);
        SparkBulkInsertCommitActionExecutor bulkInsertExecutor = new SparkBulkInsertCommitActionExecutor(this.context, config, (HoodieTable)table, instantTime, (HoodieData)HoodieJavaRDD.of(inputRecords), Option.empty());
        List returnedStatuses = ((HoodieData)bulkInsertExecutor.execute().getWriteStatuses()).collectAsList();
        this.verifyStatusResult(returnedStatuses, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(inputRecords));
    }

    @ParameterizedTest(name="[{index}] {0}")
    @ValueSource(strings={"global_sort", "partition_sort", "none"})
    public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception {
        this.testBulkInsertRecords(bulkInsertMode);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testPartitionMetafileFormat(boolean partitionMetafileUseBaseFormat) throws Exception {
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").build();
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        Assertions.assertFalse((boolean)table.getPartitionMetafileFormat().isPresent());
        if (partitionMetafileUseBaseFormat) {
            Properties properties = new Properties();
            properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true");
            this.initMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            Assertions.assertTrue((boolean)this.metaClient.getTableConfig().getPartitionMetafileFormat().isPresent());
            table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            Assertions.assertTrue((boolean)table.getPartitionMetafileFormat().isPresent());
        }
        String instantTime = HoodieTestTable.makeNewCommitTime();
        SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);
        writeClient.startCommitWithTime(instantTime);
        JavaRDD<HoodieRecord> inputRecords = TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc, 50);
        writeClient.bulkInsert(inputRecords, instantTime);
        StoragePath partitionPath = new StoragePath(this.basePath, "2016/03/15");
        Assertions.assertTrue((boolean)HoodiePartitionMetadata.hasPartitionMetadata((HoodieStorage)this.storage, (StoragePath)partitionPath));
        Option metafilePath = HoodiePartitionMetadata.getPartitionMetafilePath((HoodieStorage)this.storage, (StoragePath)partitionPath);
        if (partitionMetafileUseBaseFormat) {
            Assertions.assertTrue((boolean)((StoragePath)metafilePath.get()).toString().endsWith(table.getBaseFileFormat().getFileExtension()));
        } else {
            Assertions.assertTrue((boolean)((StoragePath)metafilePath.get()).toString().endsWith(".hoodie_partition_metadata"));
        }
        HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.storage, partitionPath);
        partitionMetadata.readFromFS();
        Assertions.assertTrue((partitionMetadata.getPartitionDepth() == 3 ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)((String)partitionMetadata.readPartitionCreatedCommitTime().get()).equals(instantTime));
    }

    private static /* synthetic */ Iterator lambda$testInsertUpsertWithHoodieAvroPayload$21b27613$1(BaseSparkCommitActionExecutor newActionExecutor, String partitionPath, String fileId, List updates, Integer x) throws Exception {
        return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
    }

    private static /* synthetic */ Iterator lambda$testInsertUpsertWithHoodieAvroPayload$7e0db4b8$1(BaseSparkCommitActionExecutor actionExecutor, List inserts, Integer x) throws Exception {
        return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
    }

    private static /* synthetic */ Iterator lambda$testFileSizeUpsertRecords$a0c2d401$1(BaseSparkCommitActionExecutor actionExecutor, List records, Integer i) throws Exception {
        return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
    }

    private static /* synthetic */ Iterator lambda$testInsertRecords$5c7863df$1(BaseSparkCommitActionExecutor newActionExecutor, List recs3, Integer x) throws Exception {
        return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator());
    }

    private static /* synthetic */ Iterator lambda$testInsertRecords$58cec836$1(BaseSparkCommitActionExecutor actionExecutor, List recs2, Integer x) throws Exception {
        return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator());
    }

    private static /* synthetic */ Iterator lambda$testMetadataAggregateFromWriteStatus$d2d6cc3$1(BaseSparkCommitActionExecutor actionExecutor, List records, Integer x) throws Exception {
        return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
    }

    private /* synthetic */ Pair lambda$testMakeNewPath$d4eb593f$1(String partitionPath, HoodieWriteConfig config, String instantTime, HoodieTable table, String fileName, Integer x) throws Exception {
        HoodieRecord record = (HoodieRecord)Mockito.mock(HoodieRecord.class);
        Mockito.when((Object)record.getPartitionPath()).thenReturn((Object)partitionPath);
        String writeToken = FSUtils.makeWriteToken((int)TaskContext.getPartitionId(), (int)TaskContext.get().stageId(), (long)TaskContext.get().taskAttemptId());
        HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName, (TaskContextSupplier)this.supplier);
        Pair result = Pair.of((Object)io.makeNewPath(record.getPartitionPath()), (Object)writeToken);
        io.close();
        return result;
    }
}

