/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag(value="functional")
class TestHoodieSparkMergeOnReadTableClustering
extends SparkClientFunctionalTestHarness {
    TestHoodieSparkMergeOnReadTableClustering() {
    }

    private static Stream<Arguments> testClustering() {
        return Stream.of(Arguments.of((Object[])new Object[]{false, true, true}), Arguments.of((Object[])new Object[]{true, true, false}), Arguments.of((Object[])new Object[]{true, false, true}), Arguments.of((Object[])new Object[]{true, false, false}), Arguments.of((Object[])new Object[]{false, true, true}), Arguments.of((Object[])new Object[]{false, true, false}), Arguments.of((Object[])new Object[]{false, false, true}), Arguments.of((Object[])new Object[]{false, false, false}));
    }

    @ParameterizedTest
    @MethodSource
    void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10L).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x40000000L).parquetMaxFileSize(0x40000000L).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClustering(Boolean.valueOf(true)).withInlineClusteringNumCommits(1).build()).withRollbackUsingMarkers(false);
        this.addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
        HoodieWriteConfig cfg = cfgBuilder.build();
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)cfg.getProps());
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(400));
            Stream dataFiles = this.insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
            Assertions.assertTrue((boolean)dataFiles.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            dataFiles = this.insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
            Assertions.assertTrue((boolean)dataFiles.findAny().isPresent(), (String)"should list the base files we wrote in the delta commit");
            if (doUpdates) {
                newCommitTime = "003";
                client.startCommitWithTime(newCommitTime);
                records = dataGen.generateUpdates(newCommitTime, Integer.valueOf(100));
                this.updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
            }
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            hoodieTable.getHoodieView().sync();
            List allFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            Assertions.assertEquals((int)(dataGen.getPartitionPaths().length * 2), (int)allFiles.size());
            String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            Assertions.assertEquals((long)allFiles.size(), (long)hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
            this.doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen, clusteringAsRow);
        }
    }

    private static Stream<Arguments> testClusteringWithNoBaseFiles() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, true, false}), Arguments.of((Object[])new Object[]{true, false, false}), Arguments.of((Object[])new Object[]{false, true, false}), Arguments.of((Object[])new Object[]{false, false, false}), Arguments.of((Object[])new Object[]{true, true, true}));
    }

    @ParameterizedTest
    @MethodSource
    void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean doUpdates, boolean shouldWriteRecordPositions) throws Exception {
        HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().forTable("test-trip-table").withPath(this.basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(true).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10L).withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x40000000L).parquetMaxFileSize(0x40000000L).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).withWriteRecordPositionsEnabled(shouldWriteRecordPositions).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0).withInlineClustering(Boolean.valueOf(true)).withInlineClusteringNumCommits(1).build()).withRollbackUsingMarkers(false);
        HoodieWriteConfig cfg = cfgBuilder.build();
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties)cfg.getProps());
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            String newCommitTime = "001";
            client.startCommitWithTime(newCommitTime);
            List records = dataGen.generateInserts(newCommitTime, Integer.valueOf(400));
            Stream dataFiles = this.insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
            Assertions.assertTrue((!dataFiles.findAny().isPresent() ? 1 : 0) != 0, (String)"should not have any base files");
            newCommitTime = "002";
            client.startCommitWithTime(newCommitTime);
            dataFiles = this.insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
            Assertions.assertTrue((!dataFiles.findAny().isPresent() ? 1 : 0) != 0, (String)"should not have any base files");
            if (doUpdates) {
                newCommitTime = "003";
                client.startCommitWithTime(newCommitTime);
                records = dataGen.generateUpdates(newCommitTime, Integer.valueOf(100));
                this.updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
            }
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            hoodieTable.getHoodieView().sync();
            List allBaseFiles = this.listAllBaseFilesInPath((HoodieTable)hoodieTable);
            Assertions.assertEquals((int)0, (int)allBaseFiles.size());
            String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            Assertions.assertEquals((long)dataGen.getPartitionPaths().length, (long)hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
            this.doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen, clusteringAsRow);
        }
    }

    private void doClusteringAndValidate(SparkRDDWriteClient client, String clusteringCommitTime, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, HoodieTestDataGenerator dataGen, boolean clusteringAsRow) {
        client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(), Boolean.toString(clusteringAsRow));
        client.cluster(clusteringCommitTime, true);
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        HoodieSparkTable clusteredTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
        clusteredTable.getHoodieView().sync();
        Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()).flatMap(arg_0 -> TestHoodieSparkMergeOnReadTableClustering.lambda$doClusteringAndValidate$0((HoodieTable)clusteredTable, arg_0));
        Assertions.assertEquals((long)dataGen.getPartitionPaths().length, (long)dataFilesToRead.count());
        HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
        Assertions.assertEquals((int)1, (int)timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), (String)"Expecting a single commit.");
        Assertions.assertEquals((Object)clusteringCommitTime, (Object)((HoodieInstant)timeline.lastInstant().get()).requestedTime());
        Assertions.assertEquals((Object)"replacecommit", (Object)((HoodieInstant)timeline.lastInstant().get()).getAction());
        if (cfg.populateMetaFields()) {
            Assertions.assertEquals((long)400L, (long)HoodieClientTestUtils.countRecordsOptionallySince((JavaSparkContext)this.jsc(), (String)this.basePath(), (SQLContext)this.sqlContext(), (HoodieTimeline)timeline, (Option)Option.of((Object)"000")), (String)"Must contain 200 records");
        } else {
            Assertions.assertEquals((long)400L, (long)HoodieClientTestUtils.countRecordsOptionallySince((JavaSparkContext)this.jsc(), (String)this.basePath(), (SQLContext)this.sqlContext(), (HoodieTimeline)timeline, (Option)Option.empty()));
        }
    }

    private static /* synthetic */ Stream lambda$doClusteringAndValidate$0(HoodieTable clusteredTable, String p) {
        return clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p);
    }
}

