/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.Serializable;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.HoodieUnsafeUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.Alias$;
import org.apache.spark.sql.catalyst.expressions.ExprId;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class HoodieDatasetBulkInsertHelper$
implements Logging {
    public static HoodieDatasetBulkInsertHelper$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new HoodieDatasetBulkInsertHelper$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public Dataset<Row> prepareForBulkInsert(Dataset<Row> df, HoodieWriteConfig config, BulkInsertPartitioner<Dataset<Row>> partitioner, boolean shouldDropPartitionColumns) {
        Dataset<Row> dataset;
        boolean populateMetaFields = config.populateMetaFields();
        StructType schema = df.schema();
        Seq metaFields = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new StructField[]{new StructField("_hoodie_commit_time", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("_hoodie_commit_seqno", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("_hoodie_record_key", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("_hoodie_partition_path", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("_hoodie_file_name", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())}));
        StructType updatedSchema = StructType$.MODULE$.apply((Seq)metaFields.$plus$plus((GenTraversableOnce)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])schema.fields())), Seq$.MODULE$.canBuildFrom()));
        if (populateMetaFields) {
            String keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, "Key-generator class name is required");
            RDD<InternalRow> prependedRdd = df.queryExecution().toRdd().mapPartitions((Function1 & Serializable & scala.Serializable)iter -> {
                SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface)ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps()));
                return iter.map((Function1 & Serializable & scala.Serializable)row -> {
                    UTF8String recordKey = keyGenerator.getRecordKey((InternalRow)row, schema);
                    UTF8String partitionPath = keyGenerator.getPartitionPath((InternalRow)row, schema);
                    UTF8String commitTimestamp = UTF8String.EMPTY_UTF8;
                    UTF8String commitSeqNo = UTF8String.EMPTY_UTF8;
                    UTF8String filename = UTF8String.EMPTY_UTF8;
                    return new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, (InternalRow)row, false);
                });
            }, df.queryExecution().toRdd().mapPartitions$default$2(), ClassTag$.MODULE$.apply(InternalRow.class));
            RDD<InternalRow> dedupedRdd = config.shouldCombineBeforeInsert() ? this.dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField(), SparkHoodieIndexFactory.isGlobalIndex(config)) : prependedRdd;
            dataset = HoodieUnsafeUtils$.MODULE$.createDataFrameFromRDD(df.sparkSession(), dedupedRdd, updatedSchema);
        } else {
            LogicalPlan query = df.queryExecution().logical();
            Seq metaFieldsStubs = (Seq)metaFields.map((Function1 & Serializable & scala.Serializable)f -> {
                Literal x$1 = new Literal((Object)UTF8String.EMPTY_UTF8, (DataType)StringType$.MODULE$);
                String x$2 = f.name();
                ExprId x$3 = Alias$.MODULE$.apply$default$3((Expression)x$1, x$2);
                Seq x$4 = Alias$.MODULE$.apply$default$4((Expression)x$1, x$2);
                Option x$5 = Alias$.MODULE$.apply$default$5((Expression)x$1, x$2);
                Seq x$6 = Alias$.MODULE$.apply$default$6((Expression)x$1, x$2);
                return new Alias((Expression)x$1, x$2, x$3, x$4, x$5, x$6);
            }, Seq$.MODULE$.canBuildFrom());
            Project prependedQuery = new Project((Seq)metaFieldsStubs.$plus$plus((GenTraversableOnce)query.output(), Seq$.MODULE$.canBuildFrom()), query);
            dataset = HoodieUnsafeUtils$.MODULE$.createDataFrameFrom(df.sparkSession(), (LogicalPlan)prependedQuery);
        }
        Dataset<Row> updatedDF = dataset;
        Dataset<Row> trimmedDF = shouldDropPartitionColumns ? this.dropPartitionColumns(updatedDF, config) : updatedDF;
        return partitioner.repartitionRecords(trimmedDF, config.getBulkInsertShuffleParallelism());
    }

    public HoodieData<WriteStatus> bulkInsert(Dataset<Row> dataset, String instantTime, HoodieTable<? extends HoodieRecordPayload<? extends HoodieRecordPayload<?>>, ?, ?, ?> table, HoodieWriteConfig writeConfig, BulkInsertPartitioner<Dataset<Row>> partitioner, int parallelism, boolean shouldPreserveHoodieMetadata) {
        Dataset<Row> repartitionedDataset = partitioner.repartitionRecords(dataset, parallelism);
        boolean arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted();
        StructType schema = dataset.schema();
        WriteStatus[] writeStatuses = (WriteStatus[])repartitionedDataset.queryExecution().toRdd().mapPartitions((Function1 & Serializable & scala.Serializable)iter -> {
            TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
            Integer taskPartitionId = taskContextSupplier.getPartitionIdSupplier().get();
            long taskId = Predef$.MODULE$.Integer2int(taskContextSupplier.getStageIdSupplier().get());
            Long taskEpochId = taskContextSupplier.getAttemptIdSupplier().get();
            try (BulkInsertDataInternalWriterHelper writer = new BulkInsertDataInternalWriterHelper(table, writeConfig, instantTime, Predef$.MODULE$.Integer2int(taskPartitionId), taskId, Predef$.MODULE$.Long2long(taskEpochId), schema, writeConfig.populateMetaFields(), arePartitionRecordsSorted, shouldPreserveHoodieMetadata);){
                try {
                    iter.foreach((Function1 & Serializable & scala.Serializable)row -> {
                        writer.write(row);
                        return BoxedUnit.UNIT;
                    });
                }
                catch (Throwable t) {
                    writer.abort();
                    throw t;
                }
            }
            return ((IterableLike)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(writer.getWriteStatuses()).asScala()).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toWriteStatus(), Buffer$.MODULE$.canBuildFrom())).iterator();
        }, repartitionedDataset.queryExecution().toRdd().mapPartitions$default$2(), ClassTag$.MODULE$.apply(WriteStatus.class)).collect();
        return table.getContext().parallelize((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])writeStatuses)).toList()).asJava());
    }

    private RDD<InternalRow> dedupeRows(RDD<InternalRow> rdd, StructType schema, String preCombineFieldRef, boolean isGlobalIndex) {
        int recordKeyMetaFieldOrd = schema.fieldIndex("_hoodie_record_key");
        int partitionPathMetaFieldOrd = schema.fieldIndex("_hoodie_partition_path");
        HoodieUnsafeRowUtils.NestedFieldPath preCombineFieldPath = HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, preCombineFieldRef);
        return RDD$.MODULE$.rddToPairRDDFunctions(RDD$.MODULE$.rddToPairRDDFunctions(rdd.map((Function1 & Serializable & scala.Serializable)row -> {
            String string;
            if (isGlobalIndex) {
                string = row.getString(recordKeyMetaFieldOrd);
            } else {
                String partitionPath = row.getString(partitionPathMetaFieldOrd);
                String recordKey = row.getString(recordKeyMetaFieldOrd);
                string = new StringBuilder(1).append(partitionPath).append(":").append(recordKey).toString();
            }
            String rowKey = string;
            return new Tuple2((Object)rowKey, (Object)row.copy());
        }, ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(InternalRow.class), (Ordering)Ordering.String$.MODULE$).reduceByKey((Function2 & Serializable & scala.Serializable)(oneRow, otherRow) -> {
            Comparable otherPreCombineVal;
            Comparable onePreCombineVal = (Comparable)HoodieUnsafeRowUtils$.MODULE$.getNestedInternalRowValue((InternalRow)oneRow, preCombineFieldPath);
            return onePreCombineVal.compareTo(otherPreCombineVal = (Comparable)HoodieUnsafeRowUtils$.MODULE$.getNestedInternalRowValue((InternalRow)otherRow, preCombineFieldPath)) >= 0 ? oneRow : otherRow;
        }), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(InternalRow.class), (Ordering)Ordering.String$.MODULE$).values();
    }

    private Dataset<Row> dropPartitionColumns(Dataset<Row> df, HoodieWriteConfig config) {
        Set partitionPathFields = this.getPartitionPathFields(config).toSet();
        Set nestedPartitionPathFields = (Set)partitionPathFields.filter((Function1 & Serializable & scala.Serializable)f -> BoxesRunTime.boxToBoolean((boolean)HoodieDatasetBulkInsertHelper$.$anonfun$dropPartitionColumns$1(f)));
        if (nestedPartitionPathFields.nonEmpty()) {
            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Can not drop nested partition path fields: ").append(nestedPartitionPathFields).toString());
        }
        Seq partitionPathCols = ((SetLike)partitionPathFields.$minus$minus((GenTraversableOnce)nestedPartitionPathFields)).toSeq();
        return df.drop(partitionPathCols);
    }

    private Seq<String> getPartitionPathFields(HoodieWriteConfig config) {
        String keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME);
        BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator)ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps()));
        return (Seq)JavaConverters$.MODULE$.asScalaBufferConverter(keyGenerator.getPartitionPathFields()).asScala();
    }

    public static final /* synthetic */ boolean $anonfun$dropPartitionColumns$1(String f) {
        return new StringOps(Predef$.MODULE$.augmentString(f)).contains((Object)BoxesRunTime.boxToCharacter((char)'.'));
    }

    private HoodieDatasetBulkInsertHelper$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

