/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.functional;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.api.java.JavaRDD;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestSparkNonBlockingConcurrencyControl
extends SparkClientFunctionalTestHarness {
    String jsonSchema = "{\n  \"type\": \"record\",\n  \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n  \"fields\": [\n    {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n    {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n  ]\n}";
    private Schema schema;
    private HoodieTableMetaClient metaClient;

    @BeforeEach
    public void setUp() throws Exception {
        this.schema = new Schema.Parser().parse(this.jsonSchema);
    }

    @Test
    public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.INSERT);
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.fileExists(), (String)"No base data files should have been created");
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        this.checkWrittenData(result, 1);
    }

    @Test
    public void testNonBlockingConcurrencyControlWithInflightInstant() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.INSERT);
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        List<String> dataset3 = Collections.singletonList("id3,Julian,53,4,par1");
        String insertTime3 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses3 = this.writeData(client1, insertTime3, dataset3, false, WriteOperationType.INSERT);
        client1.commitStats(insertTime3, writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,null,1,par1]");
        this.checkWrittenData(result, 1);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig(true);
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        String fileID = null;
        if (bulkInsertFirst) {
            SparkRDDWriteClient client0 = this.getHoodieWriteClient(config);
            List<String> dataset0 = Collections.singletonList("id0,Danny,0,0,par1");
            String insertTime0 = client0.createNewInstantTime();
            List<WriteStatus> writeStatuses0 = this.writeData(client0, insertTime0, dataset0, false, WriteOperationType.BULK_INSERT, true);
            client0.commitStats(insertTime0, writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
            for (WriteStatus status : writeStatuses0) {
                if (fileID == null) {
                    fileID = status.getFileId();
                } else {
                    org.junit.jupiter.api.Assertions.assertEquals((Object)fileID, (Object)status.getFileId());
                }
                org.junit.jupiter.api.Assertions.assertFalse((boolean)FSUtils.isLogFile((String)new StoragePath(status.getStat().getPath()).getName()));
            }
            client0.close();
        }
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,22,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT, true);
        for (WriteStatus status : writeStatuses1) {
            if (fileID == null) {
                fileID = status.getFileId();
            } else {
                org.junit.jupiter.api.Assertions.assertEquals((Object)fileID, (Object)status.getFileId());
            }
            org.junit.jupiter.api.Assertions.assertTrue((boolean)FSUtils.isLogFile((String)new StoragePath(status.getStat().getPath()).getName()));
        }
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,Danny,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.UPSERT, true);
        for (WriteStatus status : writeStatuses2) {
            org.junit.jupiter.api.Assertions.assertEquals((Object)fileID, (Object)status.getFileId());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)FSUtils.isLogFile((String)new StoragePath(status.getStat().getPath()).getName()));
        }
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client1.close();
        client2.close();
        this.metaClient.reloadActiveTimeline();
        List instants = this.metaClient.getActiveTimeline().getInstants();
        if (bulkInsertFirst) {
            org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)instants.size());
            org.junit.jupiter.api.Assertions.assertTrue((Long.parseLong(((HoodieInstant)instants.get(0)).getCompletionTime()) < Long.parseLong(((HoodieInstant)instants.get(1)).requestedTime()) ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((Long.parseLong(((HoodieInstant)instants.get(1)).getCompletionTime()) > Long.parseLong(((HoodieInstant)instants.get(2)).requestedTime()) ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((Long.parseLong(((HoodieInstant)instants.get(2)).getCompletionTime()) > Long.parseLong(((HoodieInstant)instants.get(1)).requestedTime()) ? 1 : 0) != 0);
        } else {
            org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)instants.size());
            org.junit.jupiter.api.Assertions.assertTrue((Long.parseLong(((HoodieInstant)instants.get(0)).getCompletionTime()) > Long.parseLong(((HoodieInstant)instants.get(1)).requestedTime()) ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertTrue((Long.parseLong(((HoodieInstant)instants.get(1)).getCompletionTime()) > Long.parseLong(((HoodieInstant)instants.get(0)).requestedTime()) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase1() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        this.writeData(client1, insertTime1, dataset1, true, WriteOperationType.INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        this.writeData(client2, insertTime2, dataset2, true, WriteOperationType.BULK_INSERT);
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        this.checkWrittenData(result, 1);
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase2() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT);
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertThrows(HoodieWriteConflictException.class, () -> client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase3() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        org.junit.jupiter.api.Assertions.assertThrows(HoodieWriteConflictException.class, () -> client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase4() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT);
        client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        this.checkWrittenData(result, 1);
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase5() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        List<WriteStatus> writeStatuses2 = this.writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        List<WriteStatus> writeStatuses1 = this.writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT);
        client2.commitStats(insertTime2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        client1.commitStats(insertTime1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        this.checkWrittenData(result, 1);
    }

    @Test
    public void testBulkInsertAndInsertConcurrentCase6() throws Exception {
        HoodieWriteConfig config = this.createHoodieWriteConfig();
        this.metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        SparkRDDWriteClient client1 = this.getHoodieWriteClient(config);
        List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
        String insertTime1 = client1.createNewInstantTime();
        this.writeData(client1, insertTime1, dataset1, true, WriteOperationType.BULK_INSERT);
        SparkRDDWriteClient client2 = this.getHoodieWriteClient(config);
        List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
        String insertTime2 = client2.createNewInstantTime();
        this.writeData(client2, insertTime2, dataset2, true, WriteOperationType.INSERT);
        String compactionTime = (String)client1.scheduleCompaction(Option.empty()).get();
        client1.compact(compactionTime);
        Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
        this.checkWrittenData(result, 1);
    }

    private HoodieWriteConfig createHoodieWriteConfig() {
        return this.createHoodieWriteConfig(false);
    }

    private HoodieWriteConfig createHoodieWriteConfig(boolean fullUpdate) {
        String payloadClassName = PartialUpdateAvroPayload.class.getName();
        if (fullUpdate) {
            payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
        }
        Properties props = this.getPropertiesForKeyGen(true);
        props.put(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        String basePath = this.basePath();
        return HoodieWriteConfig.newBuilder().withProps(Collections.singletonMap(HoodieTableConfig.PRECOMBINE_FIELD.key(), "ts")).forTable("test").withPath(basePath).withSchema(this.jsonSchema).withParallelism(2, 2).withAutoCommit(false).withRecordMergeMode(RecordMergeMode.CUSTOM).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(payloadClassName).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).withPopulateMetaFields(true).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL).withMarkersType(MarkerType.DIRECT.name()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy((ConflictResolutionStrategy)new BucketIndexConcurrentFileWritesConflictResolutionStrategy()).build()).build();
    }

    private void checkWrittenData(Map<String, String> expected, int partitions) throws IOException {
        File baseFile = this.tempDir.toFile();
        assert (baseFile.isDirectory());
        FileFilter filter = file -> !file.getName().startsWith(".");
        File[] partitionDirs = baseFile.listFiles(filter);
        org.junit.jupiter.api.Assertions.assertNotNull((Object)partitionDirs);
        MatcherAssert.assertThat((Object)partitionDirs.length, (Matcher)CoreMatchers.is((Object)partitions));
        for (File partitionDir : partitionDirs) {
            File[] dataFiles = partitionDir.listFiles(filter);
            org.junit.jupiter.api.Assertions.assertNotNull((Object)dataFiles);
            File latestDataFile = Arrays.stream(dataFiles).max(Comparator.comparing(f -> FSUtils.getCommitTime((String)f.getName()))).orElse(dataFiles[0]);
            ParquetReader reader = AvroParquetReader.builder((Path)new Path(latestDataFile.getAbsolutePath())).build();
            ArrayList<String> readBuffer = new ArrayList<String>();
            GenericRecord nextRecord = (GenericRecord)reader.read();
            while (nextRecord != null) {
                readBuffer.add(TestSparkNonBlockingConcurrencyControl.filterOutVariables(nextRecord));
                nextRecord = (GenericRecord)reader.read();
            }
            readBuffer.sort(Comparator.naturalOrder());
            MatcherAssert.assertThat((Object)((Object)readBuffer).toString(), (Matcher)CoreMatchers.is((Object)expected.get(partitionDir.getName())));
        }
    }

    private static String filterOutVariables(GenericRecord genericRecord) {
        ArrayList<String> fields = new ArrayList<String>();
        fields.add(TestSparkNonBlockingConcurrencyControl.getFieldValue(genericRecord, "_hoodie_record_key"));
        fields.add(TestSparkNonBlockingConcurrencyControl.getFieldValue(genericRecord, "_hoodie_partition_path"));
        fields.add(TestSparkNonBlockingConcurrencyControl.getFieldValue(genericRecord, "id"));
        fields.add(TestSparkNonBlockingConcurrencyControl.getFieldValue(genericRecord, "name"));
        fields.add(TestSparkNonBlockingConcurrencyControl.getFieldValue(genericRecord, "age"));
        fields.add(genericRecord.get("ts").toString());
        fields.add(genericRecord.get("part").toString());
        return String.join((CharSequence)",", fields);
    }

    private static String getFieldValue(GenericRecord genericRecord, String fieldName) {
        if (genericRecord.get(fieldName) != null) {
            return genericRecord.get(fieldName).toString();
        }
        return null;
    }

    private boolean fileExists() {
        ArrayList<File> dirsToCheck = new ArrayList<File>();
        dirsToCheck.add(this.tempDir.toFile());
        while (!dirsToCheck.isEmpty()) {
            File dir = (File)dirsToCheck.remove(0);
            for (File file : Objects.requireNonNull(dir.listFiles())) {
                if (file.getName().startsWith(".")) continue;
                if (file.isDirectory()) {
                    dirsToCheck.add(file);
                    continue;
                }
                return true;
            }
        }
        return false;
    }

    private GenericRecord str2GenericRecord(String str) {
        GenericData.Record record = new GenericData.Record(this.schema);
        String[] fieldValues = str.split(",");
        ValidationUtils.checkArgument((fieldValues.length == 5 ? 1 : 0) != 0, (String)"Valid record must have 5 fields");
        record.put("id", StringUtils.isNullOrEmpty((String)fieldValues[0]) ? null : fieldValues[0]);
        record.put("name", StringUtils.isNullOrEmpty((String)fieldValues[1]) ? null : fieldValues[1]);
        record.put("age", StringUtils.isNullOrEmpty((String)fieldValues[2]) ? null : Integer.valueOf(Integer.parseInt(fieldValues[2])));
        record.put("ts", StringUtils.isNullOrEmpty((String)fieldValues[3]) ? null : Long.valueOf(Long.parseLong(fieldValues[3])));
        record.put("part", StringUtils.isNullOrEmpty((String)fieldValues[4]) ? null : fieldValues[4]);
        return record;
    }

    private List<HoodieRecord> str2HoodieRecord(List<String> records, boolean fullUpdate) {
        return records.stream().map(recordStr -> {
            GenericRecord record = this.str2GenericRecord((String)recordStr);
            Object payload = fullUpdate ? new OverwriteWithLatestAvroPayload(record, (Comparable)((Long)record.get("ts"))) : new PartialUpdateAvroPayload(record, (Comparable)((Long)record.get("ts")));
            return new HoodieAvroRecord(new HoodieKey((String)record.get("id"), (String)record.get("part")), (HoodieRecordPayload)payload);
        }).collect(Collectors.toList());
    }

    private List<WriteStatus> writeData(SparkRDDWriteClient client, String instant, List<String> records, boolean doCommit, WriteOperationType operationType) {
        return this.writeData(client, instant, records, doCommit, operationType, false);
    }

    private List<WriteStatus> writeData(SparkRDDWriteClient client, String instant, List<String> records, boolean doCommit, WriteOperationType operationType, boolean fullUpdate) {
        List writeStatuses;
        List<HoodieRecord> recordList = this.str2HoodieRecord(records, fullUpdate);
        JavaRDD writeRecords = this.jsc().parallelize(recordList, 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        client.startCommitWithTime(instant);
        switch (operationType) {
            case INSERT: {
                writeStatuses = client.insert(writeRecords, instant).collect();
                break;
            }
            case UPSERT: {
                writeStatuses = client.upsert(writeRecords, instant).collect();
                break;
            }
            case BULK_INSERT: {
                writeStatuses = client.bulkInsert(writeRecords, instant).collect();
                break;
            }
            default: {
                throw new UnsupportedOperationException(operationType + " is not supported yet in this test!");
            }
        }
        Assertions.assertNoWriteErrors((List)writeStatuses);
        if (doCommit) {
            List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
            boolean committed = client.commitStats(instant, writeStats, Option.empty(), this.metaClient.getCommitActionType());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)committed);
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatuses;
    }
}

