/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestBulkInsertInternalPartitionerForRows
extends HoodieSparkClientTestHarness {
    private static final Comparator<Row> DEFAULT_KEY_COMPARATOR = Comparator.comparing(o -> o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD));

    @BeforeEach
    public void setUp() throws Exception {
        this.initSparkContexts("TestBulkInsertInternalPartitionerForRows");
        this.initPath();
        this.initHoodieStorage();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    private static Stream<Arguments> configParams() {
        Object[][] data = new Object[][]{{BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true}, {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false, true}, {BulkInsertSortMode.NONE, true, true, false, false, true}, {BulkInsertSortMode.NONE, true, false, false, false, true}, {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false}, {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, false}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    @ParameterizedTest(name="[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}")
    @MethodSource(value={"configParams"})
    public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, boolean populateMetaFields) {
        Dataset<Row> records = this.generateTestRecords();
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath("/").withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withBulkInsertSortMode(sortMode.name()).withPopulateMetaFields(populateMetaFields).build();
        this.testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get((HoodieWriteConfig)config, (boolean)isTablePartitioned, (boolean)enforceNumOutputPartitions), records, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, TestBulkInsertInternalPartitionerForRows.generateExpectedPartitionNumRecords(records), (Option<Comparator<Row>>)Option.empty(), populateMetaFields);
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testCustomColumnSortPartitionerWithRows(boolean suffixRecordKey) {
        Dataset<Row> records = this.generateTestRecords();
        String sortColumnString = records.columns()[6];
        String[] sortColumns = sortColumnString.split(",");
        Comparator<Row> comparator = this.getCustomColumnComparator(sortColumns);
        TypedProperties properties = new TypedProperties();
        properties.setProperty(HoodieWriteConfig.BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS.key(), String.valueOf(suffixRecordKey));
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath("/").withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName()).withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString).withProperties((Properties)properties).build();
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RowCustomColumnsSortPartitioner(sortColumns, config), records, true, true, true, TestBulkInsertInternalPartitionerForRows.generateExpectedPartitionNumRecords(records), (Option<Comparator<Row>>)Option.of(comparator), true);
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RowCustomColumnsSortPartitioner(config), records, true, true, true, TestBulkInsertInternalPartitionerForRows.generateExpectedPartitionNumRecords(records), (Option<Comparator<Row>>)Option.of(comparator), true);
    }

    private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, Dataset<Row> rows, boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, Option<Comparator<Row>> comparator, boolean populateMetaFields) {
        int numPartitions = 2;
        if (!populateMetaFields) {
            Assertions.assertThrows(HoodieException.class, () -> partitioner.repartitionRecords((Object)rows, numPartitions));
            return;
        }
        Dataset actualRecords = (Dataset)partitioner.repartitionRecords(rows, numPartitions);
        if (isGloballySorted) {
            Assertions.assertTrue((actualRecords.rdd().getNumPartitions() <= numPartitions ? 1 : 0) != 0);
        } else {
            Assertions.assertEquals((int)(enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions()), (int)actualRecords.rdd().getNumPartitions());
        }
        List collectedActualRecords = actualRecords.collectAsList();
        if (isGloballySorted) {
            this.verifyRowsAscendingOrder(collectedActualRecords, comparator);
        } else if (isLocallySorted) {
            actualRecords.mapPartitions((MapPartitionsFunction & Serializable)input -> {
                ArrayList<Row> partitionRows = new ArrayList<Row>();
                while (input.hasNext()) {
                    partitionRows.add((Row)input.next());
                }
                this.verifyRowsAscendingOrder(partitionRows, comparator);
                return Collections.emptyList().iterator();
            }, (Encoder)SparkDatasetTestUtils.ENCODER);
        }
        HashMap<String, Long> actualPartitionNumRecords = new HashMap<String, Long>();
        for (Row record : collectedActualRecords) {
            String partitionPath = (String)record.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
            actualPartitionNumRecords.put(partitionPath, actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + 1L);
        }
        Assertions.assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
    }

    public static Map<String, Long> generateExpectedPartitionNumRecords(Dataset<Row> rows) {
        Dataset toReturn = rows.groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD, new String[0]).count();
        List result = toReturn.collectAsList();
        HashMap<String, Long> returnMap = new HashMap<String, Long>();
        for (Row row : result) {
            returnMap.put((String)row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD), (Long)row.getAs("count"));
        }
        return returnMap;
    }

    public Dataset<Row> generateTestRecords() {
        Dataset rowsPart1 = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)100, (String)"2016/03/15", (boolean)false);
        Dataset rowsPart2 = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)150, (String)"2015/03/16", (boolean)false);
        Dataset rowsPart3 = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)200, (String)"2015/03/17", (boolean)false);
        return rowsPart1.union(rowsPart2).union(rowsPart3);
    }

    private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) {
        ArrayList<Row> expectedRecords = new ArrayList<Row>(records);
        Collections.sort(expectedRecords, (Comparator)comparator.orElse(DEFAULT_KEY_COMPARATOR));
        Assertions.assertEquals(expectedRecords, records);
    }

    private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
        Comparator<Row> comparator = Comparator.comparing(row -> {
            StringBuilder sb = new StringBuilder((String)row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
            for (String col : sortColumns) {
                sb.append(row.getAs(col).toString());
            }
            return sb.toString();
        });
        return comparator;
    }
}

