/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.ExceptionUtil;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class TestRDDSimpleBucketBulkInsertPartitioner
extends HoodieSparkClientTestHarness {
    private static final List<Object> TABLE_TYPES = Arrays.asList("COPY_ON_WRITE", "MERGE_ON_READ");

    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        this.initSparkContexts("TestRDDSimpleBucketPartitioner");
        this.initHoodieStorage();
        this.initTimelineService();
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupResources();
    }

    @ParameterizedTest
    @MethodSource(value={"configParams"})
    public void testSimpleBucketPartitioner(String tableType, boolean partitionSort) throws IOException {
        HoodieTestUtils.init((StorageConfiguration)HoodieTestUtils.getDefaultStorageConf(), (String)this.basePath, (HoodieTableType)HoodieTableType.valueOf((String)tableType));
        int bucketNum = 10;
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").build();
        config.setValue(HoodieIndexConfig.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        config.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE, HoodieIndex.BucketIndexEngineType.SIMPLE.name());
        config.setValue(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, "_row_key");
        config.setValue(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS, "" + bucketNum);
        if (partitionSort) {
            config.setValue(HoodieWriteConfig.BULK_INSERT_SORT_MODE, BulkInsertSortMode.PARTITION_SORT.name());
        }
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        List records = dataGenerator.generateInserts("0", Integer.valueOf(1000));
        HoodieJavaRDD javaRDD = HoodieJavaRDD.of((List)records, (HoodieSparkEngineContext)this.context, (int)1);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        BulkInsertPartitioner partitioner = BulkInsertInternalPartitionerFactory.get((HoodieTable)table, (HoodieWriteConfig)config);
        JavaRDD repartitionRecords = (JavaRDD)partitioner.repartitionRecords((Object)HoodieJavaRDD.getJavaRDD((HoodieData)javaRDD), 1);
        Assertions.assertEquals((long)((long)bucketNum * javaRDD.map(HoodieRecord::getPartitionPath).distinct().count()), (long)repartitionRecords.getNumPartitions());
        if (partitionSort) {
            repartitionRecords.mapPartitionsWithIndex((Function2 & Serializable)(num, partition) -> {
                ArrayList partitionRecords = new ArrayList();
                partition.forEachRemaining(partitionRecords::add);
                ArrayList<HoodieRecord> sortedRecordList = new ArrayList<HoodieRecord>(partitionRecords);
                sortedRecordList.sort(Comparator.comparing(HoodieRecord::getRecordKey));
                Assertions.assertEquals(sortedRecordList, partitionRecords);
                return partitionRecords.iterator();
            }, false).collect();
        }
        this.getHoodieWriteClient(config).startCommitWithTime("0");
        List writeStatuses = this.getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD((HoodieData)javaRDD), "0").collect();
        HashMap writeStatusesMap = new HashMap();
        writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws));
        this.getHoodieWriteClient(config).startCommitWithTime("1");
        try {
            List writeStatuses2 = this.getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD((HoodieData)javaRDD), "1").collect();
            writeStatuses2.forEach(ws -> Assertions.assertEquals((long)ws.getTotalRecords(), (long)((WriteStatus)writeStatusesMap.get(ws.getFileId())).getTotalRecords()));
        }
        catch (Exception ex) {
            Assertions.assertEquals((Object)"COPY_ON_WRITE", (Object)tableType);
            Throwable rootExceptionCause = ExceptionUtil.getRootCause((Throwable)ex);
            Assertions.assertInstanceOf(HoodieNotSupportedException.class, (Object)rootExceptionCause);
            Assertions.assertLinesMatch(Collections.singletonList("Multiple bulk insert.*COW.*Spark native row writer.*not supported.*"), Collections.singletonList(rootExceptionCause.getMessage()));
        }
    }

    private static Iterable<Object[]> configParams() {
        ArrayList<Object[]> opts = new ArrayList<Object[]>();
        for (Object tableType : TABLE_TYPES) {
            opts.add(new Object[]{tableType, "true"});
            opts.add(new Object[]{tableType, "false"});
        }
        return opts;
    }
}

