/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieMergeHandle
extends HoodieSparkClientTestHarness {
    @BeforeEach
    public void setUp() throws Exception {
        this.initSparkContexts();
        this.initPath();
        this.initHoodieStorage();
        this.initTestDataGenerator();
        this.initMetaClient();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws Exception {
        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
        this.dataGen = new HoodieTestDataGenerator(new String[]{partitionPath});
        Properties properties = new Properties();
        properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
        properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
        HoodieWriteConfig cfg = this.getConfigBuilder().withProperties(properties).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieRecord dup;
            int i;
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(4));
            HoodieRecord record1 = (HoodieRecord)records.get(0);
            HoodieRecord record2 = (HoodieRecord)records.get(1);
            for (i = 0; i < 20; ++i) {
                dup = this.dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
                records.add(dup);
            }
            for (i = 0; i < 20; ++i) {
                dup = this.dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
                records.add(dup);
            }
            JavaRDD writeRecords = this.jsc.parallelize(records, 1);
            List statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            HoodieTimeline timeline = HoodieTestUtils.TIMELINE_FACTORY.createActiveTimeline(this.metaClient).getCommitAndReplaceTimeline();
            org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), (String)"Expecting a single commit.");
            org.junit.jupiter.api.Assertions.assertEquals((Object)newCommitTime, (Object)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (String)"Latest commit should be 001");
            org.junit.jupiter.api.Assertions.assertEquals((long)records.size(), (long)HoodieClientTestUtils.readCommit(this.basePath, this.sqlContext, timeline, newCommitTime, true, HoodieTestUtils.INSTANT_GENERATOR).count(), (String)"Must contain 44 records");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            List<Object> newRecords = new ArrayList<HoodieRecord>();
            HoodieRecord sameAsRecord1 = this.dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
            newRecords.add(sameAsRecord1);
            writeRecords = this.jsc.parallelize(newRecords, 1);
            statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            timeline = HoodieTestUtils.TIMELINE_FACTORY.createActiveTimeline(this.metaClient).getCommitAndReplaceTimeline();
            org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), (String)"Expecting two commits.");
            org.junit.jupiter.api.Assertions.assertEquals((Object)newCommitTime, (Object)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (String)"Latest commit should be 002");
            Dataset<Row> dataSet = this.getRecords();
            org.junit.jupiter.api.Assertions.assertEquals((long)45L, (long)dataSet.count(), (String)"Must contain 45 records");
            newCommitTime = "003";
            client.startCommitWithTime(newCommitTime);
            newRecords = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(2));
            writeRecords = this.jsc.parallelize(newRecords, 1);
            statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            timeline = HoodieTestUtils.TIMELINE_FACTORY.createActiveTimeline(this.metaClient).getCommitAndReplaceTimeline();
            org.junit.jupiter.api.Assertions.assertEquals((int)3, (int)timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), (String)"Expecting three commits.");
            org.junit.jupiter.api.Assertions.assertEquals((Object)newCommitTime, (Object)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (String)"Latest commit should be 003");
            dataSet = this.getRecords();
            org.junit.jupiter.api.Assertions.assertEquals((long)47L, (long)dataSet.count(), (String)"Must contain 47 records");
            newCommitTime = "004";
            client.startCommitWithTime(newCommitTime);
            ArrayList<HoodieRecord> updateRecords = new ArrayList<HoodieRecord>();
            sameAsRecord1 = this.dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
            updateRecords.add(sameAsRecord1);
            HoodieRecord sameAsRecord2 = this.dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
            updateRecords.add(sameAsRecord2);
            JavaRDD updateRecordsRDD = this.jsc.parallelize(updateRecords, 1);
            statuses = client.upsert(updateRecordsRDD, newCommitTime).collect();
            Assertions.assertNoWriteErrors((List)statuses);
            timeline = HoodieTestUtils.TIMELINE_FACTORY.createActiveTimeline(this.metaClient).getCommitAndReplaceTimeline();
            org.junit.jupiter.api.Assertions.assertEquals((int)4, (int)timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), (String)"Expecting four commits.");
            org.junit.jupiter.api.Assertions.assertEquals((Object)((HoodieInstant)timeline.lastInstant().get()).requestedTime(), (Object)newCommitTime, (String)"Latest commit should be 004");
            dataSet = this.getRecords();
            org.junit.jupiter.api.Assertions.assertEquals((long)47L, (long)dataSet.count(), (String)"Must contain 47 records");
            Row[] rows = (Row[])dataSet.collect();
            int record1Count = 0;
            int record2Count = 0;
            for (Row row2 : rows) {
                if (row2.getAs("_hoodie_record_key").equals(record1.getKey().getRecordKey())) {
                    ++record1Count;
                    org.junit.jupiter.api.Assertions.assertEquals((Object)row2.getAs("rider"), (Object)"rider-004");
                    org.junit.jupiter.api.Assertions.assertEquals((Object)row2.getAs("driver"), (Object)"driver-004");
                    continue;
                }
                if (row2.getAs("_hoodie_record_key").equals(record2.getKey().getRecordKey())) {
                    ++record2Count;
                    org.junit.jupiter.api.Assertions.assertEquals((Object)row2.getAs("rider"), (Object)"rider-004");
                    org.junit.jupiter.api.Assertions.assertEquals((Object)row2.getAs("driver"), (Object)"driver-004");
                    continue;
                }
                org.junit.jupiter.api.Assertions.assertNotEquals((Object)row2.getAs("rider"), (Object)"rider-004");
                org.junit.jupiter.api.Assertions.assertNotEquals((Object)row2.getAs("driver"), (Object)"rider-004");
            }
            org.junit.jupiter.api.Assertions.assertEquals((int)22, (int)record1Count);
            org.junit.jupiter.api.Assertions.assertEquals((int)21, (int)record2Count);
            HoodieTableFileSystemView tableView = this.getHoodieTableFileSystemView(this.metaClient, (HoodieTimeline)this.metaClient.getActiveTimeline(), HoodieTestTable.of((HoodieTableMetaClient)this.metaClient).listAllBaseFiles());
            Set latestBaseFileNames = tableView.getLatestBaseFiles().map(BaseFile::getFileName).collect(Collectors.toSet());
            Set metadataFilenameFieldRefs = dataSet.collectAsList().stream().map(row -> row.getAs(HoodieRecord.FILENAME_METADATA_FIELD)).collect(Collectors.toSet());
            org.junit.jupiter.api.Assertions.assertEquals(latestBaseFileNames, metadataFilenameFieldRefs);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testHoodieMergeHandleWriteStatMetrics(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
        properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
        HoodieWriteConfig config = this.getConfigBuilder().withProperties(properties).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(config);){
            String newCommitTime = "100";
            writeClient.startCommitWithTime(newCommitTime);
            List records = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            JavaRDD recordsRDD = this.jsc.parallelize(records, 1);
            List statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
            org.junit.jupiter.api.Assertions.assertTrue((statuses.stream().filter(status -> status.getStat().getPrevCommit() != "null").count() > 0L ? 1 : 0) != 0);
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)statuses.stream().map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get());
            this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
            newCommitTime = "101";
            writeClient.startCommitWithTime(newCommitTime);
            List updatedRecords = this.dataGen.generateUpdates(newCommitTime, records);
            JavaRDD updatedRecordsRDD = this.jsc.parallelize(updatedRecords, 1);
            statuses = writeClient.upsert(updatedRecordsRDD, newCommitTime).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)statuses.stream().filter(status -> status.getStat().getPrevCommit() == "null").count());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)statuses.stream().map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get());
            newCommitTime = "102";
            writeClient.startCommitWithTime(newCommitTime);
            List allRecords = this.dataGen.generateInserts(newCommitTime, Integer.valueOf(100));
            allRecords.addAll(updatedRecords);
            JavaRDD allRecordsRDD = this.jsc.parallelize(allRecords, 1);
            statuses = writeClient.upsert(allRecordsRDD, newCommitTime).collect();
            org.junit.jupiter.api.Assertions.assertEquals((long)0L, (long)statuses.stream().filter(status -> status.getStat().getPrevCommit() == "null").count());
            org.junit.jupiter.api.Assertions.assertEquals((long)200L, (long)statuses.stream().map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get());
            org.junit.jupiter.api.Assertions.assertEquals((long)100L, (long)statuses.stream().map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get());
            statuses.forEach(writeStatus -> writeStatus.getWrittenRecordDelegates().forEach(r -> org.junit.jupiter.api.Assertions.assertTrue((boolean)r.getNewLocation().isPresent())));
        }
    }

    private Dataset<Row> getRecords() {
        String[] fullPartitionPaths = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < fullPartitionPaths.length; ++i) {
            fullPartitionPaths[i] = Paths.get(this.basePath, this.dataGen.getPartitionPaths()[i], "*").toString();
        }
        Dataset<Row> dataSet = HoodieClientTestUtils.read(this.jsc, this.basePath, this.sqlContext, this.storage, fullPartitionPaths);
        return dataSet;
    }

    protected HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x100000L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x100000L).parquetMaxFileSize(0x100000L).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class);
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true}), Arguments.arguments((Object[])new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true}));
    }

    public static class TestWriteStatus
    extends WriteStatus {
        public TestWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
            super(Boolean.valueOf(true), failureFraction);
        }
    }
}

