/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005f\u0001\u0002\b\u0010\u0001aAQa\b\u0001\u0005\u0002\u0001BQa\t\u0001\u0005B\u0011Bqa\u000b\u0001C\u0002\u0013\u0005A\u0006\u0003\u00046\u0001\u0001\u0006I!\f\u0005\bm\u0001\u0011\r\u0011\"\u00018\u0011\u0019A\u0005\u0001)A\u0005q!)\u0011\n\u0001C\u0001\u0015\")q\u000f\u0001C\u0001q\"9\u00111\u0007\u0001\u0005\u0002\u0005U\u0002bBA6\u0001\u0011\u0005\u0011Q\u000e\u0005\b\u0003o\u0002A\u0011AA=\u0011\u001d\t\t\t\u0001C\u0001\u0003\u0007Cq!!&\u0001\t\u0003\t9JA\nUKN$8\u000b]1sW\u0012\u000bG/Y*pkJ\u001cWM\u0003\u0002\u0011#\u0005Qa-\u001e8di&|g.\u00197\u000b\u0005I\u0019\u0012\u0001\u00025vI&T!\u0001F\u000b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00051\u0012aA8sO\u000e\u00011C\u0001\u0001\u001a!\tQR$D\u0001\u001c\u0015\ta\u0012#A\u0005uKN$X\u000f^5mg&\u0011ad\u0007\u0002!'B\f'o[\"mS\u0016tGOR;oGRLwN\\1m)\u0016\u001cH\u000fS1s]\u0016\u001c8/\u0001\u0004=S:LGO\u0010\u000b\u0002CA\u0011!\u0005A\u0007\u0002\u001f\u0005!1m\u001c8g)\u0005)\u0003C\u0001\u0014*\u001b\u00059#B\u0001\u0015\u0014\u0003\u0015\u0019\b/\u0019:l\u0013\tQsEA\u0005Ta\u0006\u00148nQ8oM\u0006Y\u0001/\u0019:bY2,G.[:n+\u0005i\u0003C\u0001\u00184\u001b\u0005y#B\u0001\u00192\u0003\u0011a\u0017M\\4\u000b\u0003I\nAA[1wC&\u0011Ag\f\u0002\b\u0013:$XmZ3s\u00031\u0001\u0018M]1mY\u0016d\u0017n]7!\u0003)\u0019w.\\7p]>\u0003Ho]\u000b\u0002qA!\u0011HQ#F\u001d\tQ\u0004\t\u0005\u0002<}5\tAH\u0003\u0002>/\u00051AH]8pizR\u0011aP\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0003z\na\u0001\u0015:fI\u00164\u0017BA\"E\u0005\ri\u0015\r\u001d\u0006\u0003\u0003z\u0002\"!\u000f$\n\u0005\u001d#%AB*ue&tw-A\u0006d_6lwN\\(qiN\u0004\u0013\u0001\u0004;fgR\u001cuN]3GY><H\u0003B&P#N\u0003\"\u0001T'\u000e\u0003yJ!A\u0014 \u0003\tUs\u0017\u000e\u001e\u0005\u0006!\u001e\u0001\r!R\u0001\ni\u0006\u0014G.\u001a+za\u0016DQAU\u0004A\u0002\u0015\u000b1b[3z\u000f\u0016t7\t\\1tg\")Ak\u0002a\u0001\u000b\u0006I\u0011N\u001c3fqRK\b/\u001a\u0015\u0007\u000fY\u00137\r]9\u0011\u0005]\u0003W\"\u0001-\u000b\u0005eS\u0016\u0001\u00039s_ZLG-\u001a:\u000b\u0005mc\u0016A\u00029be\u0006l7O\u0003\u0002^=\u00069!.\u001e9ji\u0016\u0014(BA0\u0016\u0003\u0015QWO\\5u\u0013\t\t\u0007LA\u0005DgZ\u001cv.\u001e:dK\u0006)a/\u00197vK22AM\u001a5kY:\f\u0013!Z\u0001>\u0007>\u0003\u0016lX(O?^\u0013\u0016\nV#}_J<g&\u00199bG\",g\u0006[;eS:ZW-_4f]:\u001a\u0016.\u001c9mK.+\u0017pR3oKJ\fGo\u001c:}\u00052{u*T\u0011\u0002O\u0006q4i\u0014)Z?>sul\u0016*J)\u0016cxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gNL*j[BdWmS3z\u000f\u0016tWM]1u_Jd8+S'Q\u0019\u0016\u000b\u0013![\u0001M\u0007>\u0003\u0016lX(O?^\u0013\u0016\nV#}_J<g&\u00199bG\",g\u0006[;eS:ZW-_4f]:ruN\u001c9beRLG/[8oK\u0012\\U-_$f]\u0016\u0014\u0018\r^8sy\u001ecuJQ!M?\ncujT'\"\u0003-\fQ(T#S\u000f\u0016{vJT0S\u000b\u0006#Ep\u001c:h]\u0005\u0004\u0018m\u00195f]!,H-\u001b\u0018lKf<WM\u001c\u0018TS6\u0004H.Z&fs\u001e+g.\u001a:bi>\u0014HP\u0011'P\u001f6\u000b\u0013!\\\u0001?\u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#}_J<g&\u00199bG\",g\u0006[;eS:ZW-_4f]:\u001a\u0016.\u001c9mK.+\u0017pR3oKJ\fGo\u001c:}'&k\u0005\u000bT#\"\u0003=\fA*T#S\u000f\u0016{vJT0S\u000b\u0006#Ep\u001c:h]\u0005\u0004\u0018m\u00195f]!,H-\u001b\u0018lKf<WM\u001c\u0018O_:\u0004\u0018M\u001d;ji&|g.\u001a3LKf<UM\\3sCR|'\u000f`$M\u001f\n\u000bEj\u0018\"M\u001f>k\u0015!\u00033fY&l\u0017\u000e^3s9\u0005a\bFA\u0004t!\t!X/D\u0001[\u0013\t1(LA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fQ\u0003^3ti&kW.\u001e;bE2,Wk]3s\r2|w\u000fF\u0003LsjdX\u0010C\u0003Q\u0011\u0001\u0007Q\tC\u0003|\u0011\u0001\u0007Q)A\u0005pa\u0016\u0014\u0018\r^5p]\")!\u000b\u0003a\u0001\u000b\")A\u000b\u0003a\u0001\u000b\"2\u0001B\u00162\u0000aFd\u0003$!\u0001\u0002\u0006\u0005%\u0011QBA\t\u0003+\tI\"!\b\u0002\"\u0005\u0015\u0012\u0011FA\u0017C\t\t\u0019!\u0001#D\u001fBKvl\u0014(`/JKE+\u0012?j]N,'\u000f\u001e?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?C\u0019>{U*\t\u0002\u0002\b\u0005)5i\u0014)Z?>sul\u0016*J)\u0016c\u0018N\\:feRdxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gNL*j[BdWmS3z\u000f\u0016tWM]1u_Jd8+S'Q\u0019\u0016\u000b#!a\u0003\u0002'\u000e{\u0005+W0P\u001d~;&+\u0013+Fy&t7/\u001a:uy>\u0014xML1qC\u000eDWM\f5vI&t3.Z=hK:tcj\u001c8qCJ$\u0018\u000e^5p]\u0016$7*Z=HK:,'/\u0019;per<Ej\u0014\"B\u0019~\u0013EjT(NC\t\ty!\u0001#N\u000bJ;UiX(O?J+\u0015\t\u0012?j]N,'\u000f\u001e?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?C\u0019>{U*\t\u0002\u0002\u0014\u0005)U*\u0012*H\u000b~{ej\u0018*F\u0003\u0012c\u0018N\\:feRdxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gNL*j[BdWmS3z\u000f\u0016tWM]1u_Jd8+S'Q\u0019\u0016\u000b#!a\u0006\u0002'6+%kR#`\u001f:{&+R!Ey&t7/\u001a:uy>\u0014xML1qC\u000eDWM\f5vI&t3.Z=hK:tcj\u001c8qCJ$\u0018\u000e^5p]\u0016$7*Z=HK:,'/\u0019;per<Ej\u0014\"B\u0019~\u0013EjT(NC\t\tY\"A%D\u001fBKvl\u0014(`/JKE+\u0012?ck2\\w,\u001b8tKJ$Hp\u001c:h]\u0005\u0004\u0018m\u00195f]!,H-\u001b\u0018lKf<WM\u001c\u0018TS6\u0004H.Z&fs\u001e+g.\u001a:bi>\u0014HP\u0011'P\u001f6\u000b#!a\b\u0002\u0015\u000e{\u0005+W0P\u001d~;&+\u0013+Fy\n,Hn[0j]N,'\u000f\u001e?pe\u001et\u0013\r]1dQ\u0016t\u0003.\u001e3j]-,\u0017pZ3o]MKW\u000e\u001d7f\u0017\u0016Lx)\u001a8fe\u0006$xN\u001d?T\u00136\u0003F*R\u0011\u0003\u0003G\t\u0001lQ(Q3~{ejX,S\u0013R+EPY;mW~Kgn]3sir|'o\u001a\u0018ba\u0006\u001c\u0007.\u001a\u0018ik\u0012Lgf[3zO\u0016tgFT8oa\u0006\u0014H/\u001b;j_:,GmS3z\u000f\u0016tWM]1u_Jdx\tT(C\u00032{&\tT(P\u001b\u0006\u0012\u0011qE\u0001J\u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#}EVd7nX5og\u0016\u0014H\u000f`8sO:\n\u0007/Y2iK:BW\u000fZ5/W\u0016Lx-\u001a8/'&l\u0007\u000f\\3LKf<UM\\3sCR|'\u000f \"M\u001f>k\u0015EAA\u0016\u0003)kUIU$F?>suLU#B\tr\u0014W\u000f\\6`S:\u001cXM\u001d;}_J<g&\u00199bG\",g\u0006[;eS:ZW-_4f]:\u001a\u0016.\u001c9mK.+\u0017pR3oKJ\fGo\u001c:}'&k\u0005\u000bT#\"\u0005\u0005=\u0012\u0001W'F%\u001e+ul\u0014(`%\u0016\u000bE\t 2vY.|\u0016N\\:feRdxN]4/CB\f7\r[3/QV$\u0017NL6fs\u001e,gN\f(p]B\f'\u000f^5uS>tW\rZ&fs\u001e+g.\u001a:bi>\u0014Hp\u0012'P\u0005\u0006cuL\u0011'P\u001f6C#\u0001C:\u00023\r|W\u000e]1sKV\u0003H-\u0019;f\t\u001a<\u0016\u000e\u001e5Ik\u0012LGI\u001a\u000b\n\u0017\u0006]\u0012QJA)\u0003OBq!!\u000f\n\u0001\u0004\tY$A\u0004j]B,H\u000f\u00124\u0011\r\u0005u\u00121IA$\u001b\t\tyDC\u0002\u0002B\u001d\n1a]9m\u0013\u0011\t)%a\u0010\u0003\u000f\u0011\u000bG/Y:fiB!\u0011QHA%\u0013\u0011\tY%a\u0010\u0003\u0007I{w\u000fC\u0004\u0002P%\u0001\r!a\u000f\u0002\r!,H-\u001b#g\u0011\u001d\t\u0019&\u0003a\u0001\u0003+\n!BY3g_J,'k\\<t!\u0019\t9&!\u0019\u0002H9!\u0011\u0011LA/\u001d\rY\u00141L\u0005\u0002\u007f%\u0019\u0011q\f \u0002\u000fA\f7m[1hK&!\u00111MA3\u0005\u0011a\u0015n\u001d;\u000b\u0007\u0005}c\b\u0003\u0004\u0002j%\u0001\r!R\u0001\u000eG>d7\u000fV8D_6\u0004\u0018M]3\u0002A\r|W\u000e]1sK\u0016sG/\u001b:f\u0013:\u0004X\u000f\u001e*poN<\u0016\u000e\u001e5Ik\u0012LGI\u001a\u000b\b\u0017\u0006=\u00141OA;\u0011\u001d\t\tH\u0003a\u0001\u0003+\n\u0011\"\u001b8qkR\u0014vn^:\t\u000f\u0005=#\u00021\u0001\u0002<!1\u0011\u0011\u000e\u0006A\u0002\u0015\u000badY8na\u0006\u0014X-\u00128uSJ,\u0017J\u001c9vi\u00123w+\u001b;i\u0011V$\u0017\u000e\u00124\u0015\u000f-\u000bY(! \u0002\u0000!9\u0011\u0011H\u0006A\u0002\u0005m\u0002bBA(\u0017\u0001\u0007\u00111\b\u0005\u0007\u0003SZ\u0001\u0019A#\u0002/\u0011|Wj\u0014*SK\u0006$w\n\u001d;j[&TX\rZ)vKJLHcB&\u0002\u0006\u0006\u001d\u00151\u0012\u0005\b\u0003sa\u0001\u0019AA\u001e\u0011\u0019\tI\t\u0004a\u0001\u000b\u0006a1m\u001c7t)>\u001cV\r\\3di\"9\u0011Q\u0012\u0007A\u0002\u0005=\u0015aF5t\u001b\u0016$\u0018\rZ1uC\u0016s\u0017M\u00197fI>s'+Z1e!\ra\u0015\u0011S\u0005\u0004\u0003's$a\u0002\"p_2,\u0017M\\\u0001\u000fG>l\u0007/\u0019:f%>\u000be\u000e\u001a*U)\u001dY\u0015\u0011TAO\u0003?Ca!a'\u000e\u0001\u0004)\u0015\u0001\u00032bg\u0016\u0004\u0016\r\u001e5\t\r\u0005%T\u00021\u0001F\u0011\u001d\ti)\u0004a\u0001\u0003\u001f\u0003")
public class TestSparkDataSource
extends SparkClientFunctionalTestHarness {
    private final Integer parallelism = Predef$.MODULE$.int2Integer(4);
    private final scala.collection.immutable.Map<String, String> commonOpts = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)String.valueOf(this.parallelism())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)String.valueOf(this.parallelism())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.bulkinsert.shuffle.parallelism"), (Object)String.valueOf(this.parallelism())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.delete.shuffle.parallelism"), (Object)String.valueOf(this.parallelism())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.TBL_NAME.key()), (Object)"hoodie_test")}));

    public SparkConf conf() {
        return this.conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
    }

    public Integer parallelism() {
        return this.parallelism;
    }

    public scala.collection.immutable.Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @ParameterizedTest
    @CsvSource(value={"COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"}, delimiter=124)
    public void testCoreFlow(String tableType, String keyGenClass, String indexType) {
        boolean isMetadataEnabledOnWrite = true;
        boolean isMetadataEnabledOnRead = true;
        String partitionField = NonpartitionedKeyGenerator.class.getName().equals(keyGenClass) ? "" : "partition";
        scala.collection.immutable.Map options = this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieMetadataConfig.ENABLE.key()), (Object)String.valueOf(isMetadataEnabledOnWrite))).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key()), (Object)keyGenClass)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)partitionField)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieIndexConfig.INDEX_TYPE.key()), (Object)indexType));
        String colsToSelect = "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted";
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(57069L);
        FileSystem fs = HadoopFSUtils.getFs((String)this.basePath(), (Configuration)this.spark().sparkContext().hadoopConfiguration());
        List records0 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(10)))).asScala()).toList();
        Dataset inputDf0 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records0, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        inputDf0.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Overwrite).save(this.basePath());
        String commitCompletionTime1 = DataSourceTestUtils.latestCommitCompletionTime(fs, this.basePath());
        Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)fs, (String)this.basePath(), (String)"000"));
        Dataset snapshotDf1 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
        Assertions.assertEquals((long)10L, (long)snapshotDf1.count());
        this.compareEntireInputDfWithHudiDf((Dataset<Row>)inputDf0, (Dataset<Row>)snapshotDf1, colsToSelect);
        List snapshotRows1 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])snapshotDf1.collect())).toList();
        snapshotDf1.unpersist(true);
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(5)))).asScala()).toList();
        Dataset updateDf = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        updateDf.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath());
        String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.basePath());
        Dataset snapshotDf2 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
        Assertions.assertEquals((long)10L, (long)snapshotDf2.count());
        this.compareUpdateDfWithHudiDf((Dataset<Row>)updateDf, (Dataset<Row>)snapshotDf2, (List<Row>)snapshotRows1, colsToSelect);
        List snapshotRows2 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])snapshotDf2.collect())).toList();
        snapshotDf2.unpersist(true);
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(6)))).asScala()).toList();
        Dataset inputDf2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        long uniqueKeyCnt2 = inputDf2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).distinct().count();
        inputDf2.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath());
        String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.basePath());
        String commitCompletionTime3 = DataSourceTestUtils.latestCommitCompletionTime(fs, this.basePath());
        Assertions.assertEquals((int)3, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)fs, (String)this.basePath(), (String)"000").size());
        Dataset snapshotDf3 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
        Assertions.assertEquals((long)10L, (long)snapshotDf3.count(), (String)"should still be 10, since we only updated");
        this.compareUpdateDfWithHudiDf((Dataset<Row>)inputDf2, (Dataset<Row>)snapshotDf3, (List<Row>)snapshotRows2, colsToSelect);
        snapshotDf3.unpersist(true);
        String firstCommit = (String)HoodieDataSourceHelpers.listCommitsSince((FileSystem)fs, (String)this.basePath(), (String)"000").get(0);
        Dataset hoodieIncViewDf1 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.START_COMMIT().key(), commitCompletionTime1).option(DataSourceReadOptions$.MODULE$.END_COMMIT().key(), commitCompletionTime1).option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath());
        Assertions.assertEquals((long)10L, (long)hoodieIncViewDf1.count(), (String)"should have pulled 10 initial inserts");
        Row[] countsPerCommit = (Row[])hoodieIncViewDf1.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
        Assertions.assertEquals((int)1, (int)countsPerCommit.length);
        Assertions.assertEquals((Object)firstCommit, (Object)countsPerCommit[0].get(0));
        List records3 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(8)))).asScala()).toList();
        Dataset inputDf3 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records3, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        inputDf3.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath());
        Dataset hoodieIncViewDf2 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.START_COMMIT().key(), commitCompletionTime3).option(DataSourceReadOptions$.MODULE$.END_COMMIT().key(), commitCompletionTime3).option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath());
        Assertions.assertEquals((long)uniqueKeyCnt2, (long)hoodieIncViewDf2.count(), (String)"should have pulled 6 records");
        countsPerCommit = (Row[])hoodieIncViewDf2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
        Assertions.assertEquals((int)1, (int)countsPerCommit.length);
        Assertions.assertEquals((Object)commitInstantTime3, (Object)countsPerCommit[0].get(0));
        Dataset timeTravelDf = this.spark().read().format("org.apache.hudi").option("as.of.instant", commitInstantTime2).load(this.basePath()).cache();
        Assertions.assertEquals((long)10L, (long)timeTravelDf.count());
        this.compareEntireInputRowsWithHudiDf((List<Row>)snapshotRows2, (Dataset<Row>)timeTravelDf, colsToSelect);
        timeTravelDf.unpersist(true);
        if (tableType.equals("MERGE_ON_READ")) {
            this.doMORReadOptimizedQuery((Dataset<Row>)inputDf0, colsToSelect, isMetadataEnabledOnRead);
            List snapshotRows4 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).collect())).toList();
            Assertions.assertEquals((int)10, (int)snapshotRows4.length());
            List records4 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateUniqueUpdates("004", Predef$.MODULE$.int2Integer(4)))).asScala()).toList();
            Dataset inputDf4 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records4, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
            inputDf4.write().format("org.apache.hudi").options((Map)options).option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3").mode(SaveMode.Append).save(this.basePath());
            Dataset snapshotDf5 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
            this.compareUpdateDfWithHudiDf((Dataset<Row>)inputDf4, (Dataset<Row>)snapshotDf5, (List<Row>)snapshotRows4, colsToSelect);
            inputDf4.unpersist(true);
            snapshotDf5.unpersist(true);
            this.compareROAndRT(this.basePath(), colsToSelect, isMetadataEnabledOnRead);
        }
        inputDf0.unpersist(true);
        updateDf.unpersist(true);
        inputDf2.unpersist(true);
        inputDf3.unpersist(true);
    }

    @ParameterizedTest
    @CsvSource(value={"COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"}, delimiter=124)
    public void testImmutableUserFlow(String tableType, String operation, String keyGenClass, String indexType) {
        boolean isMetadataEnabledOnWrite = true;
        boolean isMetadataEnabledOnRead = true;
        String partitionField = NonpartitionedKeyGenerator.class.getName().equals(keyGenClass) ? "" : "partition";
        scala.collection.immutable.Map options = this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieMetadataConfig.ENABLE.key()), (Object)String.valueOf(isMetadataEnabledOnWrite))).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.KEYGENERATOR_CLASS_NAME().key()), (Object)keyGenClass)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)partitionField)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType)).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieIndexConfig.INDEX_TYPE.key()), (Object)indexType));
        String colsToSelect = "_row_key, begin_lat,  begin_lon, city_to_state.LA, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare.amount, fare.currency, partition, partition_path, rider, timestamp, weight, _hoodie_is_deleted";
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(57069L);
        FileSystem fs = HadoopFSUtils.getFs((String)this.basePath(), (Configuration)this.spark().sparkContext().hadoopConfiguration());
        List records0 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(10)))).asScala()).toList();
        Dataset inputDf0 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records0, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        inputDf0.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), operation).mode(SaveMode.Overwrite).save(this.basePath());
        Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)fs, (String)this.basePath(), (String)"000"));
        Dataset snapshotDf1 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath());
        Assertions.assertEquals((long)10L, (long)snapshotDf1.count());
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(5)))).asScala()).toList();
        Dataset inputDf1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        inputDf1.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), operation).mode(SaveMode.Append).save(this.basePath());
        Dataset snapshotDf2 = this.spark().read().format("hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
        Assertions.assertEquals((long)15L, (long)snapshotDf2.count());
        this.compareEntireInputDfWithHudiDf((Dataset<Row>)inputDf1.union(inputDf0), (Dataset<Row>)snapshotDf2, colsToSelect);
        snapshotDf2.unpersist(true);
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)dataGen.generateInserts("002", Predef$.MODULE$.int2Integer(6)))).asScala()).toList();
        Dataset inputDf2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, Predef$.MODULE$.Integer2int(this.parallelism()), ClassTag$.MODULE$.apply(String.class))).cache();
        inputDf2.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), operation).mode(SaveMode.Append).save(this.basePath());
        Assertions.assertEquals((int)3, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)fs, (String)this.basePath(), (String)"000").size());
        Dataset snapshotDf3 = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath()).cache();
        Assertions.assertEquals((long)21L, (long)snapshotDf3.count());
        this.compareEntireInputDfWithHudiDf((Dataset<Row>)inputDf1.union(inputDf0).union(inputDf2), (Dataset<Row>)snapshotDf3, colsToSelect);
        snapshotDf3.unpersist(true);
        inputDf0.unpersist(true);
        inputDf1.unpersist(true);
        inputDf2.unpersist(true);
    }

    public void compareUpdateDfWithHudiDf(Dataset<Row> inputDf, Dataset<Row> hudiDf, List<Row> beforeRows, String colsToCompare) {
        Dataset hudiWithoutMetaDf = hudiDf.drop((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        hudiWithoutMetaDf.registerTempTable("hudiTbl");
        inputDf.registerTempTable("inputTbl");
        Dataset beforeDf = this.spark().createDataFrame((java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter(beforeRows).asJava(), hudiDf.schema());
        beforeDf.registerTempTable("beforeTbl");
        Dataset hudiDfToCompare = this.spark().sqlContext().sql(new StringBuilder(20).append("select ").append(colsToCompare).append(" from hudiTbl").toString());
        Dataset inputDfToCompare = this.spark().sqlContext().sql(new StringBuilder(21).append("select ").append(colsToCompare).append(" from inputTbl").toString());
        Dataset beforeDfToCompare = this.spark().sqlContext().sql(new StringBuilder(22).append("select ").append(colsToCompare).append(" from beforeTbl").toString());
        Assertions.assertEquals((long)inputDfToCompare.count(), (long)hudiDfToCompare.intersect(inputDfToCompare).count());
        Assertions.assertEquals((float)0.0f, (float)hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count(), (float)0.0f);
    }

    public void compareEntireInputRowsWithHudiDf(List<Row> inputRows, Dataset<Row> hudiDf, String colsToCompare) {
        Dataset inputDf = this.spark().createDataFrame((java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter(inputRows).asJava(), hudiDf.schema());
        this.compareEntireInputDfWithHudiDf((Dataset<Row>)inputDf, hudiDf, colsToCompare);
    }

    public void compareEntireInputDfWithHudiDf(Dataset<Row> inputDf, Dataset<Row> hudiDf, String colsToCompare) {
        Dataset hudiWithoutMetaDf = hudiDf.drop((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        hudiWithoutMetaDf.registerTempTable("hudiTbl");
        inputDf.registerTempTable("inputTbl");
        Dataset hudiDfToCompare = this.spark().sqlContext().sql(new StringBuilder(20).append("select ").append(colsToCompare).append(" from hudiTbl").toString());
        Dataset inputDfToCompare = this.spark().sqlContext().sql(new StringBuilder(21).append("select ").append(colsToCompare).append(" from inputTbl").toString());
        Assertions.assertEquals((long)inputDfToCompare.count(), (long)hudiDfToCompare.intersect(inputDfToCompare).count());
        Assertions.assertEquals((long)0L, (long)hudiDfToCompare.except(inputDfToCompare).count());
    }

    public void doMORReadOptimizedQuery(Dataset<Row> inputDf, String colsToSelect, boolean isMetadataEnabledOnRead) {
        Dataset readOptDf = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(this.basePath());
        this.compareEntireInputDfWithHudiDf(inputDf, (Dataset<Row>)readOptDf, colsToSelect);
    }

    public void compareROAndRT(String basePath, String colsToCompare, boolean isMetadataEnabledOnRead) {
        Dataset roDf = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()).load(basePath);
        Dataset rtDf = this.spark().read().format("org.apache.hudi").option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead).load(basePath);
        Dataset hudiWithoutMeta1Df = roDf.drop((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        Dataset hudiWithoutMeta2Df = rtDf.drop((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}));
        hudiWithoutMeta1Df.registerTempTable("hudiTbl1");
        hudiWithoutMeta2Df.registerTempTable("hudiTbl2");
        Dataset hudiDf1ToCompare = this.spark().sqlContext().sql(new StringBuilder(21).append("select ").append(colsToCompare).append(" from hudiTbl1").toString());
        Dataset hudiDf2ToCompare = this.spark().sqlContext().sql(new StringBuilder(21).append("select ").append(colsToCompare).append(" from hudiTbl2").toString());
        Assertions.assertEquals((long)hudiDf1ToCompare.count(), (long)hudiDf1ToCompare.intersect(hudiDf2ToCompare).count());
        Assertions.assertEquals((long)0L, (long)hudiDf1ToCompare.except(hudiDf2ToCompare).count());
    }
}

