/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseBulkInsertHelper;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.spark.api.java.JavaRDD;

public class SparkBulkInsertHelper<T, R>
extends BaseBulkInsertHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
    private SparkBulkInsertHelper() {
        super(HoodieData::getNumPartitions);
    }

    public static SparkBulkInsertHelper newInstance() {
        return BulkInsertHelperHolder.HOODIE_BULK_INSERT_HELPER;
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(HoodieData<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, HoodieWriteConfig config, BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor, boolean performDedupe, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
        HoodieWriteMetadata<HoodieData<WriteStatus>> result2 = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
        table.getActiveTimeline().transitionRequestedToInflight(table.getInstantGenerator().createNewInstant(HoodieInstant.State.REQUESTED, executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant());
        BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElseGet(() -> BulkInsertInternalPartitionerFactory.get(table, config));
        HoodieData<WriteStatus> writeStatuses = this.bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), (WriteHandleFactory)new CreateHandleFactory(false));
        ((BaseSparkCommitActionExecutor)executor).updateIndexAndCommitIfNeeded(writeStatuses, result2);
        return result2;
    }

    public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, HoodieWriteConfig config, boolean performDedupe, BulkInsertPartitioner partitioner, boolean useWriterSchema, int parallelism) {
        return this.bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, useWriterSchema, parallelism, (WriteHandleFactory)null);
    }

    @Override
    public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, HoodieWriteConfig config, boolean performDedupe, BulkInsertPartitioner partitioner, boolean useWriterSchema, int configuredParallelism, WriteHandleFactory writeHandleFactory) {
        HoodieData<HoodieRecord<T>> dedupedRecords = inputRecords;
        int targetParallelism = this.deduceShuffleParallelism(inputRecords, configuredParallelism);
        if (performDedupe) {
            dedupedRecords = HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, targetParallelism, table);
        }
        HoodieJavaRDD<HoodieRecord<T>> repartitionedRecords = HoodieJavaRDD.of(partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), targetParallelism));
        JavaRDD writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords).mapPartitionsWithIndex(new BulkInsertMapFunction(instantTime, partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true).flatMap(List::iterator);
        return HoodieJavaRDD.of(writeStatusRDD);
    }

    private static class BulkInsertHelperHolder {
        private static final SparkBulkInsertHelper HOODIE_BULK_INSERT_HELPER = new SparkBulkInsertHelper();

        private BulkInsertHelperHolder() {
        }
    }
}

