/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.cluster.ClusteringTestUtils;
import org.apache.hudi.table.action.commit.AverageRecordSizeUtils;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.InsertBucket;
import org.apache.hudi.table.action.commit.InsertBucketCumulativeWeightPair;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class TestUpsertPartitioner
extends HoodieClientTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestUpsertPartitioner.class);
    private static final Schema SCHEMA = SchemaTestUtil.getSchemaFromResource(TestUpsertPartitioner.class, (String)"/exampleSchema.avsc");

    private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception {
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize((long)smallFileSize).insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024000L).parquetMaxFileSize(1024000L).orcMaxFileSize(1024000L).build()).build();
        FileCreateUtils.createCommit((String)this.basePath, (String)"001");
        FileCreateUtils.createBaseFile((String)this.basePath, (String)testPartitionPath, (String)"001", (String)"file1", (long)fileSize);
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
        List insertRecords = dataGenerator.generateInserts("001", Integer.valueOf(numInserts));
        List updateRecords = dataGenerator.generateUpdates("001", Integer.valueOf(numUpdates));
        for (HoodieRecord updateRec : updateRecords) {
            updateRec.unseal();
            updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
            updateRec.seal();
        }
        ArrayList records = new ArrayList();
        records.addAll(insertRecords);
        records.addAll(updateRecords);
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(records)));
        UpsertPartitioner partitioner = new UpsertPartitioner(profile, (HoodieEngineContext)this.context, (HoodieTable)table, config);
        Assertions.assertEquals((int)0, (int)partitioner.getPartition((Object)new Tuple2((Object)((HoodieRecord)updateRecords.get(0)).getKey(), (Object)Option.ofNullable((Object)((HoodieRecord)updateRecords.get(0)).getCurrentLocation()))), (String)"Update record should have gone to the 1 update partition");
        return partitioner;
    }

    private static List<HoodieInstant> setupHoodieInstants() {
        ArrayList<HoodieInstant> instants = new ArrayList<HoodieInstant>();
        instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "ts1"));
        instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "ts2"));
        instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "ts3"));
        instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "ts4"));
        instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "ts5"));
        Collections.reverse(instants);
        return instants;
    }

    private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
        List writeStatsList = HoodieTestUtils.generateFakeHoodieWriteStat((int)5);
        for (int i = 0; i < writeStatsList.size() - 1; ++i) {
            HoodieWriteStat writeStat = (HoodieWriteStat)writeStatsList.get(i);
            writeStat.setNumWrites(0L);
            writeStat.setTotalWriteBytes(0L);
        }
        HoodieWriteStat lastWriteStat = (HoodieWriteStat)writeStatsList.get(writeStatsList.size() - 1);
        lastWriteStat.setTotalWriteBytes((long)totalBytesWritten);
        lastWriteStat.setNumWrites((long)totalRecordsWritten);
        return writeStatsList;
    }

    private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) {
        List<HoodieWriteStat> fakeHoodieWriteStats = TestUpsertPartitioner.generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
        HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
        fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
        return commitMetadata;
    }

    private static LinkedList<Option<byte[]>> generateCommitMetadataList() throws IOException {
        LinkedList<Option<byte[]>> commits = new LinkedList<Option<byte[]>>();
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(2000, 10000).toJsonString())));
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(1500, 7500).toJsonString())));
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(100, 500).toJsonString())));
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(0, 0).toJsonString())));
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(0, 1500).toJsonString())));
        commits.push(Option.of((Object)StringUtils.getUTF8Bytes((String)TestUpsertPartitioner.generateCommitMetadataWith(2500, 0).toJsonString())));
        return commits;
    }

    @Test
    public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception {
        HoodieTimeline commitTimeLine = (HoodieTimeline)Mockito.mock(HoodieTimeline.class);
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000L).build()).build();
        Mockito.when((Object)commitTimeLine.empty()).thenReturn((Object)false);
        Mockito.when((Object)commitTimeLine.getReverseOrderedInstants()).thenReturn(TestUpsertPartitioner.setupHoodieInstants().stream());
        LinkedList<Option<byte[]>> commits = TestUpsertPartitioner.generateCommitMetadataList();
        Mockito.when((Object)commitTimeLine.getInstantDetails((HoodieInstant)ArgumentMatchers.any(HoodieInstant.class))).thenAnswer(invocationOnMock -> (Option)commits.pop());
        long expectAvgSize = (long)Math.ceil(5.0);
        long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord((HoodieTimeline)commitTimeLine, (HoodieWriteConfig)config);
        Assertions.assertEquals((long)expectAvgSize, (long)actualAvgSize);
    }

    @Test
    public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception {
        HoodieTimeline commitTimeLine = (HoodieTimeline)Mockito.mock(HoodieTimeline.class);
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().build();
        Mockito.when((Object)commitTimeLine.empty()).thenReturn((Object)true);
        long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
        long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord((HoodieTimeline)commitTimeLine, (HoodieWriteConfig)config);
        Assertions.assertEquals((long)expectAvgSize, (long)actualAvgSize);
    }

    @Test
    public void testUpsertPartitioner() throws Exception {
        String testPartitionPath = "2016/09/26";
        UpsertPartitioner partitioner = this.getUpsertPartitioner(0, 200, 100, 1024, "2016/09/26", false);
        List insertBuckets = partitioner.getInsertBuckets("2016/09/26");
        Assertions.assertEquals((int)2, (int)insertBuckets.size(), (String)"Total of 2 insert buckets");
    }

    @Test
    public void testUpsertPartitionerWithRecordsPerBucket() throws Exception {
        String testPartitionPath = "2016/09/26";
        UpsertPartitioner partitioner = this.getUpsertPartitioner(0, 250, 100, 1024, "2016/09/26", false);
        List insertBuckets = partitioner.getInsertBuckets("2016/09/26");
        int insertSplitSize = partitioner.config.getCopyOnWriteInsertSplitSize();
        int remainedInsertSize = 250 - 2 * insertSplitSize;
        Assertions.assertEquals((int)3, (int)insertBuckets.size(), (String)"Total of 3 insert buckets");
        Assertions.assertEquals((double)0.4, (double)((InsertBucketCumulativeWeightPair)insertBuckets.get((int)0)).getLeft().weight, (String)("insert " + insertSplitSize + " records"));
        Assertions.assertEquals((double)0.4, (double)((InsertBucketCumulativeWeightPair)insertBuckets.get((int)1)).getLeft().weight, (String)("insert " + insertSplitSize + " records"));
        Assertions.assertEquals((double)0.2, (double)((InsertBucketCumulativeWeightPair)insertBuckets.get((int)2)).getLeft().weight, (String)("insert " + remainedInsertSize + " records"));
    }

    @Test
    public void testPartitionWeight() throws Exception {
        String testPartitionPath = "2016/09/26";
        int totalInsertNum = 2000;
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0L).insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();
        FileCreateUtils.createCommit((String)this.basePath, (String)"001");
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        List insertRecords = dataGenerator.generateInserts("001", Integer.valueOf(totalInsertNum));
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(insertRecords)));
        UpsertPartitioner partitioner = new UpsertPartitioner(profile, (HoodieEngineContext)this.context, (HoodieTable)table, config);
        List insertBuckets = partitioner.getInsertBuckets("2016/09/26");
        float bucket0Weight = 0.2f;
        InsertBucketCumulativeWeightPair pair = (InsertBucketCumulativeWeightPair)insertBuckets.remove(0);
        ((InsertBucket)pair.getKey()).weight = bucket0Weight;
        pair.setValue(new Double(bucket0Weight));
        insertBuckets.add(0, pair);
        InsertBucketCumulativeWeightPair pair1 = (InsertBucketCumulativeWeightPair)insertBuckets.remove(1);
        ((InsertBucket)pair1.getKey()).weight = 1.0f - bucket0Weight;
        pair1.setValue(new Double(1.0));
        insertBuckets.add(1, pair1);
        HashMap<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
        for (HoodieRecord hoodieRecord : insertRecords) {
            int partition = partitioner.getPartition((Object)new Tuple2((Object)hoodieRecord.getKey(), (Object)Option.ofNullable((Object)hoodieRecord.getCurrentLocation())));
            if (!partition2numRecords.containsKey(partition)) {
                partition2numRecords.put(partition, 0);
            }
            partition2numRecords.put(partition, (Integer)partition2numRecords.get(partition) + 1);
        }
        Assertions.assertTrue(((Integer)partition2numRecords.get(0) < (Integer)partition2numRecords.get(1) ? 1 : 0) != 0, (String)"The insert num of bucket1 should more than bucket0");
        Assertions.assertTrue(((Integer)partition2numRecords.get(0) + (Integer)partition2numRecords.get(1) == totalInsertNum ? 1 : 0) != 0, (String)("The total insert records should be " + totalInsertNum));
        Assertions.assertEquals((Object)String.valueOf(bucket0Weight), (Object)String.format("%.1f", Float.valueOf((float)((Integer)partition2numRecords.get(0)).intValue() * 1.0f / (float)totalInsertNum)), (String)("The weight of bucket0 should be " + bucket0Weight));
        Assertions.assertEquals((Object)String.valueOf(1.0f - bucket0Weight), (Object)String.format("%.1f", Float.valueOf((float)((Integer)partition2numRecords.get(1)).intValue() * 1.0f / (float)totalInsertNum)), (String)("The weight of bucket1 should be " + (1.0f - bucket0Weight)));
    }

    private void assertInsertBuckets(Double[] weights, Double[] cumulativeWeights, List<InsertBucketCumulativeWeightPair> insertBuckets) {
        for (int i = 0; i < weights.length; ++i) {
            Assertions.assertEquals((int)i, (int)((InsertBucket)insertBuckets.get((int)i).getKey()).bucketNumber, (String)String.format("BucketNumber of insert bucket %d must be same as %d", i, i));
            Assertions.assertEquals((double)weights[i], (double)((InsertBucket)insertBuckets.get((int)i).getKey()).weight, (double)0.01, (String)String.format("Insert bucket %d should have weight %.1f", i, weights[i]));
            Assertions.assertEquals((double)cumulativeWeights[i], (double)((Double)insertBuckets.get(i).getValue()), (double)0.01, (String)String.format("Insert bucket %d should have cumulativeWeight %.1f", i, cumulativeWeights[i]));
        }
    }

    @Test
    public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
        String testPartitionPath = "2016/09/26";
        UpsertPartitioner partitioner = this.getUpsertPartitioner(1024000, 400, 100, 819200, "2016/09/26", false);
        List insertBuckets = partitioner.getInsertBuckets("2016/09/26");
        Assertions.assertEquals((int)3, (int)partitioner.numPartitions(), (String)"Should have 3 partitions");
        Assertions.assertEquals((Object)BucketType.UPDATE, (Object)partitioner.getBucketInfo((int)0).bucketType, (String)"Bucket 0 is UPDATE");
        Assertions.assertEquals((Object)BucketType.INSERT, (Object)partitioner.getBucketInfo((int)1).bucketType, (String)"Bucket 1 is INSERT");
        Assertions.assertEquals((Object)BucketType.INSERT, (Object)partitioner.getBucketInfo((int)2).bucketType, (String)"Bucket 2 is INSERT");
        Assertions.assertEquals((int)3, (int)insertBuckets.size(), (String)"Total of 3 insert buckets");
        Double[] weights = new Double[]{0.5, 0.25, 0.25};
        Double[] cumulativeWeights = new Double[]{0.5, 0.75, 1.0};
        this.assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
        partitioner = this.getUpsertPartitioner(1024000, 2400, 100, 819200, "2016/09/26", true);
        insertBuckets = partitioner.getInsertBuckets("2016/09/26");
        Assertions.assertEquals((int)4, (int)partitioner.numPartitions(), (String)"Should have 4 partitions");
        Assertions.assertEquals((Object)BucketType.UPDATE, (Object)partitioner.getBucketInfo((int)0).bucketType, (String)"Bucket 0 is UPDATE");
        Assertions.assertEquals((Object)BucketType.INSERT, (Object)partitioner.getBucketInfo((int)1).bucketType, (String)"Bucket 1 is INSERT");
        Assertions.assertEquals((Object)BucketType.INSERT, (Object)partitioner.getBucketInfo((int)2).bucketType, (String)"Bucket 2 is INSERT");
        Assertions.assertEquals((Object)BucketType.INSERT, (Object)partitioner.getBucketInfo((int)3).bucketType, (String)"Bucket 3 is INSERT");
        Assertions.assertEquals((int)4, (int)insertBuckets.size(), (String)"Total of 4 insert buckets");
        weights = new Double[]{0.08, 0.42, 0.42, 0.08};
        cumulativeWeights = new Double[]{0.08, 0.5, 0.92, 1.0};
        this.assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
    }

    @Test
    public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWithCanIndexLogFiles() throws Exception {
        String testPartitionPath = HoodieTestUtils.DEFAULT_PARTITION_PATHS[0];
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024L).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE).withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build()).build()).build();
        HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan((HoodieTableMetaClient)this.metaClient, (String)"001", (String)"002", (int)1, (boolean)true, (boolean)false);
        FileCreateUtils.createRequestedCompactionCommit((String)this.basePath, (String)"002", (HoodieCompactionPlan)plan);
        FileCreateUtils.createBaseFile((String)this.basePath, (String)testPartitionPath, (String)"003", (String)"2", (long)1L);
        FileCreateUtils.createCommit((String)this.basePath, (String)"003");
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
        List insertRecords = dataGenerator.generateInserts("004", Integer.valueOf(100));
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(insertRecords)));
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, this.context, (HoodieTable)table, config);
        Assertions.assertEquals((int)1, (int)partitioner.numPartitions(), (String)"Should have 1 partitions");
        Assertions.assertEquals((Object)BucketType.UPDATE, (Object)partitioner.getBucketInfo((int)0).bucketType, (String)"Bucket 0 is UPDATE");
        Assertions.assertEquals((Object)"2", (Object)partitioner.getBucketInfo((int)0).fileIdPrefix, (String)"Should be assigned to only file id not pending compaction which is 2");
    }

    @Test
    public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws Exception {
        String testPartitionPath = HoodieTestUtils.DEFAULT_PARTITION_PATHS[0];
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(Boolean.valueOf(false)).withAsyncClustering(Boolean.valueOf(false)).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024000L).parquetMaxFileSize(1024000L).build()).build();
        HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(this.metaClient, "001", "1");
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
        FileCreateUtils.createRequestedReplaceCommit((String)this.basePath, (String)"002", (Option)Option.of((Object)requestedReplaceMetadata));
        FileCreateUtils.createBaseFile((String)this.basePath, (String)testPartitionPath, (String)"003", (String)"3", (long)1L);
        FileCreateUtils.createCommit((String)this.basePath, (String)"003");
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
        List insertRecords = dataGenerator.generateInserts("004", Integer.valueOf(100));
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(insertRecords)));
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        UpsertPartitioner partitioner = new UpsertPartitioner(profile, (HoodieEngineContext)this.context, (HoodieTable)table, config);
        Assertions.assertEquals((int)1, (int)partitioner.smallFiles.size(), (String)"Should have 1 small file to be ingested.");
    }

    @Test
    public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() throws Exception {
        String testPartitionPath = HoodieTestUtils.DEFAULT_PARTITION_PATHS[0];
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE).withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build()).build()).build();
        FileCreateUtils.createLogFile((String)this.basePath, (String)testPartitionPath, (String)"001", (String)"fg1", (int)1);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"001");
        FileCreateUtils.createBaseFile((String)this.basePath, (String)testPartitionPath, (String)"002", (String)"fg2", (long)1024L);
        FileCreateUtils.createCommit((String)this.basePath, (String)"002");
        FileCreateUtils.createLogFile((String)this.basePath, (String)testPartitionPath, (String)"003", (String)"fg2", (int)1);
        FileCreateUtils.createDeltaCommit((String)this.basePath, (String)"003");
        this.metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
        List insertRecords = dataGenerator.generateInserts("004", Integer.valueOf(1));
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(insertRecords)));
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, this.context, (HoodieTable)table, config);
        Assertions.assertEquals((int)1, (int)partitioner.numPartitions(), (String)"Should have 1 partitions");
        Assertions.assertEquals((Object)BucketType.UPDATE, (Object)partitioner.getBucketInfo((int)0).bucketType, (String)"Bucket 0 should be UPDATE");
        Assertions.assertEquals((Object)"fg1", (Object)partitioner.getBucketInfo((int)0).fileIdPrefix, (String)"Insert should be assigned to fg1");
    }

    @Test
    public void testUpsertPartitionerWithSmallFileHandlingPickingMultipleCandidates() throws Exception {
        String partitionPath = HoodieTestUtils.DEFAULT_PARTITION_PATHS[0];
        HoodieWriteConfig config = this.makeHoodieClientConfigBuilder().withMergeSmallFileGroupCandidatesLimit(3).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(2048L).build()).build();
        FileCreateUtils.createBaseFile((String)this.basePath, (String)partitionPath, (String)"002", (String)"fg-1", (long)1024L);
        FileCreateUtils.createBaseFile((String)this.basePath, (String)partitionPath, (String)"002", (String)"fg-2", (long)1024L);
        FileCreateUtils.createBaseFile((String)this.basePath, (String)partitionPath, (String)"002", (String)"fg-3", (long)1024L);
        FileCreateUtils.createCommit((String)this.basePath, (String)"002");
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath});
        WorkloadProfile profile = new WorkloadProfile(TestUpsertPartitioner.buildProfile((JavaRDD<HoodieRecord>)this.jsc.parallelize(dataGenerator.generateInserts("003", Integer.valueOf(3)))));
        HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metaClient);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)reloadedMetaClient);
        SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, this.context, (HoodieTable)table, config);
        Assertions.assertEquals((int)3, (int)partitioner.numPartitions());
        Assertions.assertEquals(Arrays.asList(new BucketInfo(BucketType.UPDATE, "fg-3", partitionPath), new BucketInfo(BucketType.UPDATE, "fg-2", partitionPath), new BucketInfo(BucketType.UPDATE, "fg-1", partitionPath)), (Object)partitioner.getBucketInfos());
    }

    private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema(SCHEMA.toString());
    }
}

