/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy;
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestSparkConsistentBucketClustering
extends HoodieSparkClientTestHarness {
    private HoodieWriteConfig config;
    private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0L);

    public void setup(int maxFileSize) throws IOException {
        this.setup(maxFileSize, Collections.emptyMap());
    }

    public void setup(int maxFileSize, Map<String, String> options) throws IOException {
        this.setup(maxFileSize, options, false);
    }

    public void setup(int maxFileSize, Map<String, String> options, boolean singleJob) throws IOException {
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.initHoodieStorage();
        Properties props = TestSparkConsistentBucketClustering.getPropertiesForKeyGen((boolean)true);
        props.putAll(options);
        props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.metaClient = HoodieTestUtils.init((StorageConfiguration)this.storageConf, (String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (Properties)props);
        this.config = this.getConfigBuilder().withProps((Map)props).withAutoCommit(false).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).withBucketNum("8").withBucketMaxNum(14).withBucketMinNum(4).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize((long)maxFileSize).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringPlanStrategyClass(SparkConsistentBucketClusteringPlanStrategy.class.getName()).withClusteringExecutionStrategyClass(singleJob ? "org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy" : "org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy").withClusteringUpdatesStrategy(SparkConsistentBucketDuplicateUpdateStrategy.class.getName()).build()).build();
        this.writeClient = this.getHoodieWriteClient(this.config);
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupResources();
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException {
        int maxFileSize = isSplit ? 5120 : 0x8000000;
        int targetBucketNum = isSplit ? 14 : 4;
        this.setup(maxFileSize, Collections.emptyMap(), single);
        this.config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable));
        this.config.setValue("hoodie.metadata.enable", "false");
        this.writeData(this.writeClient.createNewInstantTime(), 2000, true);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        this.writeClient.cluster(clusteringTime, true);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)this.config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        org.junit.jupiter.api.Assertions.assertEquals((int)2000, (int)this.readRecords().size());
        Arrays.stream(this.dataGen.getPartitionPaths()).forEach(arg_0 -> TestSparkConsistentBucketClustering.lambda$testResizing$1((HoodieTable)table, targetBucketNum, arg_0));
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testLoadMetadata(boolean isCommitFilePresent, boolean rowWriterEnable, boolean single) throws IOException {
        int maxFileSize = 5120;
        int targetBucketNum = 14;
        this.setup(5120, Collections.emptyMap(), single);
        this.writeClient.getConfig().setValue(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
        this.writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4");
        this.writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
        this.writeClient.getConfig().setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable));
        this.writeData(this.writeClient.createNewInstantTime(), 2000, true);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        this.writeClient.cluster(clusteringTime, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)this.config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        this.writeClient.clean();
        TimelineArchiverV2 hoodieTimelineArchiver = new TimelineArchiverV2(this.writeClient.getConfig(), (HoodieTable)table);
        hoodieTimelineArchiver.archiveIfRequired((HoodieEngineContext)this.context);
        Arrays.stream(this.dataGen.getPartitionPaths()).forEach(arg_0 -> TestSparkConsistentBucketClustering.lambda$testLoadMetadata$3(isCommitFilePresent, (HoodieTable)table, arg_0));
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.writeData(this.writeClient.createNewInstantTime(), 10, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)2080, (int)this.readRecords().size());
    }

    @ParameterizedTest
    @MethodSource(value={"configParamsForSorting"})
    public void testClusteringColumnSort(String sortColumn, boolean rowWriterEnable) throws Exception {
        Comparator<Object> comparator;
        HashMap<String, String> options = new HashMap<String, String>();
        if (sortColumn.equals("_row_key")) {
            options.put(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.PARTITION_SORT.toString());
        } else {
            options.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), sortColumn);
        }
        options.put("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable));
        this.setup(0x8000000, options);
        this.writeData(this.writeClient.createNewInstantTime(), 500, true);
        this.writeData(this.writeClient.createNewInstantTime(), 500, true);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        this.writeClient.cluster(clusteringTime, true);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        List<Row> rows = this.readRecords();
        org.junit.jupiter.api.Assertions.assertEquals((int)1000, (int)rows.size());
        StructType schema = rows.get(0).schema();
        Schema rawSchema = AvroConversionUtils.convertStructTypeToAvroSchema((DataType)schema, (String)"test_struct_name", (String)"test_namespace");
        Schema.Field field = rawSchema.getField(sortColumn);
        Schema.Field fileNameFiled = rawSchema.getField(HoodieRecord.FILENAME_METADATA_FIELD);
        if (field.schema().getType() == Schema.Type.DOUBLE) {
            comparator = Comparator.comparingDouble(row -> (Double)((Row)row).get(field.pos()));
        } else if (field.schema().getType() == Schema.Type.STRING) {
            comparator = Comparator.comparing(row -> ((Row)row).get(field.pos()).toString());
        } else {
            throw new HoodieException("Cannot get comparator: unsupported data type, " + field.schema().getType());
        }
        Row lastRow = null;
        String lastFileName = null;
        for (Row row2 : rows) {
            String currentFileName = row2.get(fileNameFiled.pos()).toString();
            if (lastFileName != null && currentFileName.equals(lastFileName) && lastRow != null) {
                org.junit.jupiter.api.Assertions.assertTrue((lastRow == null || comparator.compare(lastRow, row2) <= 0 ? 1 : 0) != 0, (String)("The rows are not sorted based on the column: " + sortColumn));
            }
            lastRow = row2;
            lastFileName = currentFileName;
        }
    }

    @Test
    public void testConcurrentClustering() throws IOException {
        this.setup(5120);
        this.writeData(this.writeClient.createNewInstantTime(), 2000, true);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.writeClient.scheduleClustering(Option.empty()).isPresent());
        this.writeClient.cluster(clusteringTime, true);
        this.config.setValue(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS, "1");
        this.config.setValue(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, "0");
        this.config.setValue("hoodie.clustering.plan.partition.filter.mode", ClusteringPlanPartitionFilterMode.RECENT_DAYS.toString());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.writeClient.scheduleClustering(Option.empty()).isPresent());
        this.config.setValue(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS, "1");
        this.config.setValue(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, "1");
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.writeClient.scheduleClustering(Option.empty()).isPresent());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testConcurrentWrite(boolean rowWriterEnable) throws IOException {
        this.setup(5120);
        this.config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable));
        String writeTime = this.writeClient.createNewInstantTime();
        List<WriteStatus> writeStatues = this.writeData(writeTime, 2000, false);
        org.junit.jupiter.api.Assertions.assertFalse((boolean)this.writeClient.scheduleClustering(Option.empty()).isPresent());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.writeClient.commitStats(writeTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        String clusteringTime = (String)this.writeClient.scheduleClustering(Option.empty()).get();
        this.writeData(this.writeClient.createNewInstantTime(), 2000, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)4000, (int)this.readRecords().size());
        this.writeClient.cluster(clusteringTime, true);
        org.junit.jupiter.api.Assertions.assertEquals((int)4000, (int)this.readRecords().size());
    }

    private List<Row> readRecords() {
        Dataset roViewDF = this.sparkSession.read().format("hudi").load(this.basePath);
        roViewDF.createOrReplaceTempView("hudi_ro_table");
        return this.sparkSession.sqlContext().sql("select * from hudi_ro_table").collectAsList();
    }

    private List<WriteStatus> writeData(String commitTime, int totalRecords, boolean doCommit) {
        List records = this.dataGen.generateInserts(commitTime, Integer.valueOf(totalRecords));
        JavaRDD writeRecords = this.jsc.parallelize(records, 2);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        this.writeClient.startCommitWithTime(commitTime);
        List writeStatues = this.writeClient.upsert(writeRecords, commitTime).collect();
        Assertions.assertNoWriteErrors((List)writeStatues);
        if (doCommit) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)this.writeClient.commitStats(commitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        }
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        return writeStatues;
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0x100000L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(0x100000L).parquetMaxFileSize(0x100000L).build()).forTable("test-trip-table").withEmbeddedTimelineServerEnabled(true);
    }

    private static Stream<Arguments> configParams() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, false, true}), Arguments.of((Object[])new Object[]{false, false, true}), Arguments.of((Object[])new Object[]{true, true, true}), Arguments.of((Object[])new Object[]{false, true, true}), Arguments.of((Object[])new Object[]{true, false, false}), Arguments.of((Object[])new Object[]{false, false, false}), Arguments.of((Object[])new Object[]{true, true, false}), Arguments.of((Object[])new Object[]{false, true, false}));
    }

    private static Stream<Arguments> configParamsForSorting() {
        return Stream.of(Arguments.of((Object[])new Object[]{"begin_lat", true}), Arguments.of((Object[])new Object[]{"begin_lat", false}), Arguments.of((Object[])new Object[]{"_row_key", false}), Arguments.of((Object[])new Object[]{"_row_key", true}));
    }

    private static /* synthetic */ void lambda$testLoadMetadata$3(boolean isCommitFilePresent, HoodieTable table, String p) {
        if (!isCommitFilePresent) {
            StoragePath metadataPath = FSUtils.constructAbsolutePath((String)table.getMetaClient().getHashingMetadataPath(), (String)p);
            try {
                table.getStorage().listDirectEntries(metadataPath).forEach(fl -> {
                    if (fl.getPath().getName().contains(".commit")) {
                        try {
                            table.getStorage().deleteDirectory(fl.getPath());
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        HoodieConsistentHashingMetadata metadata = (HoodieConsistentHashingMetadata)ConsistentBucketIndexUtils.loadMetadata((HoodieTable)table, (String)p).get();
        org.junit.jupiter.api.Assertions.assertEquals((int)14, (int)metadata.getNodes().size());
    }

    private static /* synthetic */ void lambda$testResizing$1(HoodieTable table, int targetBucketNum, String p) {
        HoodieConsistentHashingMetadata metadata = (HoodieConsistentHashingMetadata)ConsistentBucketIndexUtils.loadMetadata((HoodieTable)table, (String)p).get();
        org.junit.jupiter.api.Assertions.assertEquals((int)targetBucketNum, (int)metadata.getNodes().size());
        table.getSliceView().getLatestFileSlices(p).forEach(fs -> {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)fs.getBaseFile().isPresent());
            org.junit.jupiter.api.Assertions.assertTrue((fs.getLogFiles().count() == 0L ? 1 : 0) != 0);
        });
    }
}

