/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional.cdc;

import java.io.Serializable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.QuickstartUtils;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.functional.cdc.HoodieCDCTestBase;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import scala.Array$;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001da\u0001B\u0005\u000b\u0001UAQA\u0007\u0001\u0005\u0002mAQ!\b\u0001\u0005\u0002yAQ\u0001\u0012\u0001\u0005\u0002\u0015CQ!\u0013\u0001\u0005\u0002)CQa\u001b\u0001\u0005\u00021DQ\u0001\u001d\u0001\u0005\u0002EDQ!\u001f\u0001\u0005\u0002iDQA \u0001\u0005\u0002}\u0014Q\u0003V3ti\u000e#5\tR1uC\u001a\u0013\u0018-\\3Tk&$XM\u0003\u0002\f\u0019\u0005\u00191\rZ2\u000b\u00055q\u0011A\u00034v]\u000e$\u0018n\u001c8bY*\u0011q\u0002E\u0001\u0005QV$\u0017N\u0003\u0002\u0012%\u00051\u0011\r]1dQ\u0016T\u0011aE\u0001\u0004_J<7\u0001A\n\u0003\u0001Y\u0001\"a\u0006\r\u000e\u0003)I!!\u0007\u0006\u0003#!{w\u000eZ5f\u0007\u0012\u001bE+Z:u\u0005\u0006\u001cX-\u0001\u0004=S:LGO\u0010\u000b\u00029A\u0011q\u0003A\u0001\u0017i\u0016\u001cHoQ(X\t\u0006$\u0018mU8ve\u000e,wK]5uKR\u0011q$\n\t\u0003A\rj\u0011!\t\u0006\u0002E\u0005)1oY1mC&\u0011A%\t\u0002\u0005+:LG\u000fC\u0003'\u0005\u0001\u0007q%A\u0006m_\u001e<\u0017N\\4N_\u0012,\u0007C\u0001\u0015/\u001b\u0005I#BA\u0006+\u0015\tYC&A\u0003uC\ndWM\u0003\u0002.\u001d\u000511m\\7n_:L!aL\u0015\u0003A!{w\u000eZ5f\u0007\u0012\u001b5+\u001e9qY\u0016lWM\u001c;bY2{wmZ5oO6{G-\u001a\u0015\u0005\u0005Ejd\b\u0005\u00023w5\t1G\u0003\u00025k\u0005A\u0001O]8wS\u0012,'O\u0003\u00027o\u00051\u0001/\u0019:b[NT!\u0001O\u001d\u0002\u000f),\b/\u001b;fe*\u0011!HE\u0001\u0006UVt\u0017\u000e^\u0005\u0003yM\u0012!\"\u00128v[N{WO]2f\u0003\u00151\u0018\r\\;fG\u00059\u0003F\u0001\u0002A!\t\t%)D\u00016\u0013\t\u0019UGA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fa\u0003^3ti6{%\u000bR1uCN{WO]2f/JLG/\u001a\u000b\u0003?\u0019CQAJ\u0002A\u0002\u001dBCaA\u0019>}!\u00121\u0001Q\u0001&i\u0016\u001cH\u000fR1uCN{WO]2f/JLG/Z,ji\"\u0004\u0016M\u001d;ji&|gNR5fY\u0012$2aH&Y\u0011\u0015aE\u00011\u0001N\u0003%!\u0018M\u00197f)f\u0004X\r\u0005\u0002O+:\u0011qj\u0015\t\u0003!\u0006j\u0011!\u0015\u0006\u0003%R\ta\u0001\u0010:p_Rt\u0014B\u0001+\"\u0003\u0019\u0001&/\u001a3fM&\u0011ak\u0016\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005Q\u000b\u0003\"\u0002\u0014\u0005\u0001\u0004i\u0005\u0006\u0002\u0003[{u\u0003\"AM.\n\u0005q\u001b$!C\"tmN{WO]2fY\u0019q\u0006M\u00193gQ\u0006\nq,A\u0010D\u001fBKvl\u0014(`/JKE+\u0012\u0017eCR\fwLY3g_J,w,\u00194uKJ\f\u0013!Y\u0001 \u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#-I\u0006$\u0018m\u00182fM>\u0014XmX1gi\u0016\u0014\u0018%A2\u00023\r{\u0005+W0P\u001d~;&+\u0013+FY\u0011\fG/Y0cK\u001a|'/Z\u0011\u0002K\u0006IR*\u0012*H\u000b~{ej\u0018*F\u0003\u0012cC-\u0019;b?\n,gm\u001c:fC\u00059\u0017!G\"P!f{vJT0X%&#V\tL8q?.,\u0017pX8oYf\f\u0013![\u0001\u001a\u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#-_B|6.Z=`_:d\u0017\u0010\u000b\u0002\u0005\u0001\u0006\tC/Z:u\u0007\u0012\u001bu+\u001b;i\u001bVdG/\u001b\"m_\u000e\\7/\u00118e\u0019><g)\u001b7fgR\u0011q$\u001c\u0005\u0006M\u0015\u0001\ra\n\u0015\u0005\u000bEjd\b\u000b\u0002\u0006\u0001\u0006AB/Z:u\u0007\u0012\u001bu+\u001b;i\u0003^\u001bF)T*QCfdw.\u00193\u0015\u0003}A#AB:\u0011\u0005Q<X\"A;\u000b\u0005Y<\u0014aA1qS&\u0011\u00010\u001e\u0002\u0005)\u0016\u001cH/\u0001\nuKN$8\tR\"DY\u0016\fgNU3uC&tGCA\u0010|\u0011\u00151s\u00011\u0001(Q\u00119\u0011'\u0010 )\u0005\u001d\u0001\u0015\u0001\f;fgR\u001cEiQ,iK:4\u0015N]:u/JLG/Z\"p]R\f\u0017N\\:VaN,'\u000f^!oI\u0012+G.\u001a;f)\ry\u0012\u0011\u0001\u0005\u0006M!\u0001\ra\n\u0015\u0005\u0011Ejd\b\u000b\u0002\t\u0001\u0002")
public class TestCDCDataFrameSuite
extends HoodieCDCTestBase {
    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    public void testCOWDataSourceWrite(HoodieCDCSupplementalLoggingMode loggingMode) {
        scala.collection.immutable.Map options = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), (Object)loggingMode.name())})));
        long totalInsertedCnt = 0L;
        long totalUpdatedCnt = 0L;
        long totalDeletedCnt = 0L;
        Dataset<Row> allVisibleCDCData = this.spark().emptyDataFrame();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Overwrite).save(this.basePath);
        this.metaClient = this.createMetaClient(this.spark(), this.basePath);
        TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
        Schema dataSchema = schemaResolver.getTableAvroSchema(false);
        Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode((HoodieCDCSupplementalLoggingMode)loggingMode, (Schema)dataSchema);
        totalInsertedCnt += 100L;
        HoodieInstant instant1 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant1));
        String commitTime1 = instant1.requestedTime();
        Dataset<Row> cdcDataOnly1 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly1, 100L, 0L, 0L);
        java.util.List hoodieRecords2 = this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50));
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)hoodieRecords2)).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant2 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertTrue((boolean)this.hasCDCLogFile(instant2));
        List cdcDataFromCDCLogFile2 = (List)this.getCDCLogFile(instant2).flatMap((Function1 & Serializable & scala.Serializable)x$1 -> this.readCDCLogFile((String)x$1, cdcSchema), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals((int)cdcDataFromCDCLogFile2.size(), (int)50);
        this.checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema, (Seq<HoodieRecord<?>>)cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE);
        String commitTime2 = instant2.requestedTime();
        Dataset currentSnapshotData = this.spark().read().format("hudi").load(this.basePath);
        long insertedCnt2 = currentSnapshotData.count() - 100L;
        long updatedCnt2 = 50L - insertedCnt2;
        Dataset<Row> cdcDataOnly2 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime2)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0L);
        totalUpdatedCnt += updatedCnt2;
        totalInsertedCnt += insertedCnt2;
        List records3 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.deleteRecordsToStrings((java.util.List)this.dataGen.generateUniqueDeletes(Predef$.MODULE$.int2Integer(20)))).asScala()).toList();
        Dataset inputDF3 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records3, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF3.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_OPERATION_OPT_VAL()).option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "1").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant3 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime3 = instant3.requestedTime();
        currentSnapshotData = this.spark().read().format("hudi").load(this.basePath);
        Dataset<Row> cdcDataOnly3 = this.cdcDataFrame(commitTime2, this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly3, 0L, 0L, 20L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt += 20L);
        Dataset<Row> cdcDataFrom2To3 = this.cdcDataFrame(commitTime1, commitTime3);
        this.assertCDCOpCnt(cdcDataFrom2To3, insertedCnt2, updatedCnt2, 20L);
        List records4 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("003", Predef$.MODULE$.int2Integer(50)))).asScala()).toList();
        Dataset inputDF4 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records4, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF4.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant4 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant4));
        String commitTime4 = instant4.requestedTime();
        Dataset<Row> cdcDataOnly4 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime4)).toLong() - 1L), this.cdcDataFrame$default$2());
        int insertedCnt4 = 50;
        long deletedCnt4 = currentSnapshotData.count();
        this.assertCDCOpCnt(cdcDataOnly4, insertedCnt4, 0L, deletedCnt4);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += (long)insertedCnt4, totalUpdatedCnt, totalDeletedCnt += deletedCnt4);
        List records5 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("005", Predef$.MODULE$.int2Integer(7)))).asScala()).toList();
        Dataset inputDF5 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records5, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF5.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        List records6 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("006", Predef$.MODULE$.int2Integer(3)))).asScala()).toList();
        Dataset inputDF6 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records6, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF6.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        List records7 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUniqueUpdates("007", Predef$.MODULE$.int2Integer(30)))).asScala()).toList();
        Dataset inputDF7 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records7, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF7.write().format("org.apache.hudi").options((Map)options).option("hoodie.clean.automatic", "true").option("hoodie.keep.min.commits", "4").option("hoodie.keep.max.commits", "5").option("hoodie.clean.commits.retained", "3").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant7 = (HoodieInstant)this.metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get();
        Dataset<Row> cdcDataOnly7 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(instant7.requestedTime())).toLong() - 1L), this.cdcDataFrame$default$2());
        Dataset currentData = this.spark().read().format("hudi").load(this.basePath);
        long insertedCnt7 = currentData.count() - 60L;
        long updatedCnt7 = 30L - insertedCnt7;
        this.assertCDCOpCnt(cdcDataOnly7, insertedCnt7, updatedCnt7, 0L);
        totalInsertedCnt = 60L + insertedCnt7;
        totalUpdatedCnt = updatedCnt7;
        totalDeletedCnt = 0L;
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime4)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt);
        List records8 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("008", Predef$.MODULE$.int2Integer(20)))).asScala()).toList();
        Dataset inputDF8 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records8, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF8.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant8 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant8));
        String commitTime8 = instant8.requestedTime();
        Dataset<Row> cdcDataOnly8 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime8)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly8, 20L, 0L, 0L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime4)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += 20L, totalUpdatedCnt, totalDeletedCnt);
        Assertions.assertThrows(HoodieException.class, () -> this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2()));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    public void testMORDataSourceWrite(HoodieCDCSupplementalLoggingMode loggingMode) {
        scala.collection.immutable.Map options = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), (Object)loggingMode.name())})));
        long totalInsertedCnt = 0L;
        long totalUpdatedCnt = 0L;
        long totalDeletedCnt = 0L;
        Dataset<Row> allVisibleCDCData = this.spark().emptyDataFrame();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Overwrite).save(this.basePath);
        this.metaClient = this.createMetaClient(this.spark(), this.basePath);
        TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
        Schema dataSchema = schemaResolver.getTableAvroSchema(false);
        Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode((HoodieCDCSupplementalLoggingMode)loggingMode, (Schema)dataSchema);
        totalInsertedCnt += 100L;
        HoodieInstant instant1 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant1));
        String commitTime1 = instant1.requestedTime();
        Dataset<Row> cdcDataOnly1 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly1, 100L, 0L, 0L);
        List records2_1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(30)))).asScala()).toList();
        Dataset inputDF2_1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2_1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2_2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("001", Predef$.MODULE$.int2Integer(20)))).asScala()).toList();
        Dataset inputDF2_2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2_2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2_1.union(inputDF2_2).write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant2 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertTrue((boolean)this.hasCDCLogFile(instant2));
        List cdcDataFromCDCLogFile2 = (List)this.getCDCLogFile(instant2).flatMap((Function1 & Serializable & scala.Serializable)x$2 -> this.readCDCLogFile((String)x$2, cdcSchema), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals((int)cdcDataFromCDCLogFile2.size(), (int)50);
        Assertions.assertEquals((int)cdcDataFromCDCLogFile2.count((Function1 & Serializable & scala.Serializable)r -> BoxesRunTime.boxToBoolean((boolean)TestCDCDataFrameSuite.$anonfun$testMORDataSourceWrite$2(r))), (int)30);
        Assertions.assertEquals((int)cdcDataFromCDCLogFile2.count((Function1 & Serializable & scala.Serializable)r -> BoxesRunTime.boxToBoolean((boolean)TestCDCDataFrameSuite.$anonfun$testMORDataSourceWrite$3(r))), (int)20);
        String commitTime2 = instant2.requestedTime();
        Dataset currentSnapshotData = this.spark().read().format("hudi").load(this.basePath);
        long insertedCnt2 = currentSnapshotData.count() - 100L;
        long updatedCnt2 = 50L - insertedCnt2;
        Dataset<Row> cdcDataOnly2 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime2)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0L);
        totalUpdatedCnt += updatedCnt2;
        totalInsertedCnt += insertedCnt2;
        List records3 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.deleteRecordsToStrings((java.util.List)this.dataGen.generateUniqueDeletes(Predef$.MODULE$.int2Integer(20)))).asScala()).toList();
        Dataset inputDF3 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records3, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF3.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_OPERATION_OPT_VAL()).option("hoodie.compact.inline", "true").option("hoodie.compact.inline.max.delta.commits", "1").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant3 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime3 = instant3.requestedTime();
        currentSnapshotData = this.spark().read().format("hudi").load(this.basePath);
        Dataset<Row> cdcDataOnly3 = this.cdcDataFrame(commitTime2, this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly3, 0L, 0L, 20L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt += 20L);
        List records4 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("003", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF4 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records4, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF4.write().format("org.apache.hudi").options((Map)options).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.BULK_INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant4 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant4));
        String commitTime4 = instant4.requestedTime();
        long cntForInstant4 = this.spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataOnly4 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime4)).toLong() - 1L), this.cdcDataFrame$default$2());
        int insertedCnt4 = 100;
        this.assertCDCOpCnt(cdcDataOnly4, insertedCnt4, 0L, 0L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += (long)insertedCnt4, totalUpdatedCnt, totalDeletedCnt);
        List records5 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUniqueUpdates("004", Predef$.MODULE$.int2Integer(60)))).asScala()).toList();
        Dataset inputDF5 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records5, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF5.write().format("org.apache.hudi").options((Map)options).option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "1").option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant5 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime5 = instant5.requestedTime();
        Dataset<Row> cdcDataOnly5 = this.cdcDataFrame(commitTime4, this.cdcDataFrame$default$2());
        long cntForInstant5 = this.spark().read().format("hudi").load(this.basePath).count();
        long insertedCnt5 = cntForInstant5 - cntForInstant4;
        long updatedCnt5 = 60L - insertedCnt5;
        this.assertCDCOpCnt(cdcDataOnly5, insertedCnt5, updatedCnt5, 0L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += insertedCnt5, totalUpdatedCnt += updatedCnt5, totalDeletedCnt);
        Dataset<Row> cdcDataFrom3To4 = this.cdcDataFrame(commitTime2, commitTime4);
        this.assertCDCOpCnt(cdcDataFrom3To4, insertedCnt4, 0L, 20L);
        List records6 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("005", Predef$.MODULE$.int2Integer(70)))).asScala()).toList();
        Dataset inputDF6 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records6, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF6.write().format("org.apache.hudi").options((Map)options).option("hoodie.compact.inline", "false").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant6 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant6));
        String commitTime6 = instant6.requestedTime();
        long cntForInstant6 = this.spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataOnly6 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime6)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly6, 70L, 0L, cntForInstant5);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += 70L, totalUpdatedCnt, totalDeletedCnt += cntForInstant5);
        List records7 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("006", Predef$.MODULE$.int2Integer(7)))).asScala()).toList();
        Dataset inputDF7 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records7, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF7.write().format("org.apache.hudi").options((Map)options).option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        totalInsertedCnt += 7L;
        List records8 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("007", Predef$.MODULE$.int2Integer(3)))).asScala()).toList();
        Dataset inputDF8 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records8, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF8.write().format("org.apache.hudi").options((Map)options).option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant8 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime8 = instant8.requestedTime();
        totalInsertedCnt += 3L;
        Dataset inputDF9 = inputDF6.limit(30);
        inputDF9.write().format("org.apache.hudi").options((Map)options).option("hoodie.clean.automatic", "true").option("hoodie.keep.min.commits", "16").option("hoodie.keep.max.commits", "17").option("hoodie.clean.commits.retained", "15").option("hoodie.compact.inline", "false").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant9 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime9 = instant9.requestedTime();
        long cntForInstant9 = this.spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataOnly9 = this.cdcDataFrame(commitTime8, this.cdcDataFrame$default$2());
        long insertedCnt9 = cntForInstant9 - cntForInstant6 - 10L;
        long updatedCnt9 = 30L - insertedCnt9;
        this.assertCDCOpCnt(cdcDataOnly9, insertedCnt9, updatedCnt9, 0L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt + 30L, totalDeletedCnt);
    }

    @ParameterizedTest
    @CsvSource(value={"COPY_ON_WRITE,data_before_after", "MERGE_ON_READ,data_before_after", "COPY_ON_WRITE,data_before", "MERGE_ON_READ,data_before", "COPY_ON_WRITE,op_key_only", "MERGE_ON_READ,op_key_only"})
    public void testDataSourceWriteWithPartitionField(String tableType, String loggingMode) {
        scala.collection.immutable.Map options = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)"partition"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), (Object)loggingMode)})));
        long totalInsertedCnt = 0L;
        long totalUpdatedCnt = 0L;
        long totalDeletedCnt = 0L;
        Dataset<Row> allVisibleCDCData = this.spark().emptyDataFrame();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Overwrite).save(this.basePath);
        scala.collection.immutable.Map partitionToCnt = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.spark().read().format("hudi").load(this.basePath).groupBy("partition", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect())).map((Function1 & Serializable & scala.Serializable)row -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)row.getString(0)), (Object)BoxesRunTime.boxToLong((long)row.getLong(1))), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
        Predef$.MODULE$.assert(partitionToCnt.contains((Object)"2016/03/15"));
        Predef$.MODULE$.assert(partitionToCnt.contains((Object)"2015/03/16"));
        this.metaClient = this.createMetaClient(this.spark(), this.basePath);
        totalInsertedCnt += 100L;
        HoodieInstant instant1 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant1));
        String commitTime1 = instant1.requestedTime();
        Dataset<Row> cdcDataOnly1 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly1, 100L, 0L, 0L);
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(30), "2016/03/15"))).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OVERWRITE_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant2 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant2));
        String commitTime2 = instant2.requestedTime();
        int insertedCnt2 = 30;
        long deletedCnt2 = BoxesRunTime.unboxToLong((Object)partitionToCnt.apply((Object)"2016/03/15"));
        Dataset<Row> cdcDataOnly2 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime2)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly2, insertedCnt2, 0L, deletedCnt2);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += (long)insertedCnt2, totalUpdatedCnt, totalDeletedCnt += deletedCnt2);
        this.spark().emptyDataFrame().write().format("org.apache.hudi").options((Map)options).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.DELETE_PARTITION_OPERATION_OPT_VAL()).option(DataSourceWriteOptions$.MODULE$.PARTITIONS_TO_DELETE().key(), "2015/03/16").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant3 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        Assertions.assertFalse((boolean)this.hasCDCLogFile(instant3));
        String commitTime3 = instant3.requestedTime();
        long cntForInstant3 = this.spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataOnly3 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime3)).toLong() - 1L), this.cdcDataFrame$default$2());
        long deletedCnt3 = BoxesRunTime.unboxToLong((Object)partitionToCnt.apply((Object)"2015/03/16"));
        this.assertCDCOpCnt(cdcDataOnly3, 0L, 0L, deletedCnt3);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt += deletedCnt3);
        List records4 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUniqueUpdates("000", Predef$.MODULE$.int2Integer(50)))).asScala()).toList();
        Dataset inputDF4 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records4, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF4.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant4 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        String commitTime4 = instant4.requestedTime();
        long cntForInstant4 = this.spark().read().format("hudi").load(this.basePath).count();
        Dataset<Row> cdcDataOnly4 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime4)).toLong() - 1L), this.cdcDataFrame$default$2());
        long insertedCnt4 = cntForInstant4 - cntForInstant3;
        long updatedCnt4 = 50L - insertedCnt4;
        this.assertCDCOpCnt(cdcDataOnly4, insertedCnt4, updatedCnt4, 0L);
        allVisibleCDCData = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime1)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt += insertedCnt4, totalUpdatedCnt += updatedCnt4, totalDeletedCnt);
        Dataset<Row> cdcDataFrom2To3 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime2)).toLong() - 1L), commitTime3);
        this.assertCDCOpCnt(cdcDataFrom2To3, insertedCnt2, 0L, deletedCnt2 + deletedCnt3);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    public void testCDCWithMultiBlocksAndLogFiles(HoodieCDCSupplementalLoggingMode loggingMode) {
        Tuple2.mcII.sp sp2;
        HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode = loggingMode;
        HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode2 = HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY;
        Tuple2.mcII.sp sp3 = !(hoodieCDCSupplementalLoggingMode != null ? !hoodieCDCSupplementalLoggingMode.equals(hoodieCDCSupplementalLoggingMode2) : hoodieCDCSupplementalLoggingMode2 != null) ? new Tuple2.mcII.sp(256, 1024) : (sp2 = new Tuple2.mcII.sp(2048, 5120));
        if (sp2 == null) {
            throw new MatchError((Object)sp2);
        }
        int blockSize = sp2._1$mcI$sp();
        int logFileSize = sp2._2$mcI$sp();
        Tuple2.mcII.sp sp4 = new Tuple2.mcII.sp(blockSize, logFileSize);
        int blockSize2 = sp4._1$mcI$sp();
        int logFileSize2 = sp4._2$mcI$sp();
        scala.collection.immutable.Map options = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key()), (Object)loggingMode.name()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.logfile.data.block.max.size"), (Object)Integer.toString(blockSize2)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.logfile.max.size"), (Object)Integer.toString(logFileSize2))})));
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Overwrite).save(this.basePath);
        this.metaClient = this.createMetaClient(this.spark(), this.basePath);
        TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
        Schema dataSchema = schemaResolver.getTableAvroSchema(false);
        Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode((HoodieCDCSupplementalLoggingMode)loggingMode, (Schema)dataSchema);
        java.util.List hoodieRecords2 = this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50));
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)hoodieRecords2)).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant2 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        List<String> cdcLogFiles2 = this.getCDCLogFile(instant2);
        List cdcDataFromCDCLogFile2 = (List)cdcLogFiles2.flatMap((Function1 & Serializable & scala.Serializable)x$4 -> this.readCDCLogFile((String)x$4, cdcSchema), List$.MODULE$.canBuildFrom());
        Assertions.assertEquals((int)cdcDataFromCDCLogFile2.size(), (int)50);
        this.checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema, (Seq<HoodieRecord<?>>)cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE);
        String commitTime2 = instant2.requestedTime();
        Dataset currentSnapshotData = this.spark().read().format("hudi").load(this.basePath);
        long insertedCnt2 = currentSnapshotData.count() - 100L;
        long updatedCnt2 = 50L - insertedCnt2;
        Dataset<Row> cdcDataOnly2 = this.cdcDataFrame(Long.toString(new StringOps(Predef$.MODULE$.augmentString(commitTime2)).toLong() - 1L), this.cdcDataFrame$default$2());
        this.assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0L);
    }

    @Test
    public void testCDCWithAWSDMSPayload() {
        scala.collection.immutable.Map options = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.name"), (Object)"test"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.recordkey.field"), (Object)"id"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.precombine.field"), (Object)"replicadmstimestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.keygenerator.class"), (Object)"org.apache.hudi.keygen.NonpartitionedKeyGenerator"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.partitionpath.field"), (Object)""), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.payload.class"), (Object)"org.apache.hudi.common.model.AWSDmsAvroPayload"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORD_MERGE_MODE().key()), (Object)RecordMergeMode.CUSTOM.name()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.cdc.enabled"), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.cdc.supplemental.logging.mode"), (Object)"data_before_after")}));
        Seq data = (Seq)new .colon.colon((Object)new Tuple4((Object)"1", (Object)"I", (Object)"2023-06-14 15:46:06.953746", (Object)"A"), (List)new .colon.colon((Object)new Tuple4((Object)"2", (Object)"I", (Object)"2023-06-14 15:46:07.953746", (Object)"B"), (List)new .colon.colon((Object)new Tuple4((Object)"3", (Object)"I", (Object)"2023-06-14 15:46:08.953746", (Object)"C"), (List)Nil$.MODULE$)));
        StructType schema = StructType$.MODULE$.apply((Seq)new .colon.colon((Object)new StructField("id", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("Op", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("replicadmstimestamp", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("code", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), (List)Nil$.MODULE$)))));
        Dataset df = this.spark().createDataFrame((java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)data.map((Function1 & Serializable & scala.Serializable)tuple -> Row$.MODULE$.fromTuple(tuple), Seq$.MODULE$.canBuildFrom())).asJava(), schema);
        df.write().format("org.apache.hudi").option("hoodie.datasource.write.operation", "upsert").options((Map)options).mode("append").save(this.basePath);
        Assertions.assertEquals((long)this.spark().read().format("org.apache.hudi").load(this.basePath).count(), (long)3L);
        Seq newData = (Seq)new .colon.colon((Object)new Tuple4((Object)"3", (Object)"D", (Object)"2023-06-14 15:47:09.953746", (Object)"B"), (List)Nil$.MODULE$);
        Dataset newDf = this.spark().createDataFrame((java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)newData.map((Function1 & Serializable & scala.Serializable)tuple -> Row$.MODULE$.fromTuple(tuple), Seq$.MODULE$.canBuildFrom())).asJava(), schema);
        newDf.write().format("org.apache.hudi").option("hoodie.datasource.write.operation", "upsert").options((Map)options).mode("append").save(this.basePath);
        Assertions.assertEquals((long)this.spark().read().format("org.apache.hudi").load(this.basePath).count(), (long)2L);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    public void testCDCCleanRetain(HoodieCDCSupplementalLoggingMode loggingMode) {
        scala.collection.immutable.Map options = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.cdc.enabled"), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.cdc.supplemental.logging.mode"), (Object)loggingMode.name()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.bulkinsert.shuffle.parallelism"), (Object)"2"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.delete.shuffle.parallelism"), (Object)"1"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.recordkey.field"), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.datasource.write.precombine.field"), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.table.name"), (Object)new StringBuilder(11).append("hoodie_test").append(loggingMode.name()).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.clean.automatic"), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.clean.commits.retained"), (Object)"1")}));
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.write().format("org.apache.hudi").options((Map)options).mode(SaveMode.Overwrite).save(this.basePath);
        this.metaClient = this.createMetaClient(this.spark(), this.basePath);
        java.util.List hoodieRecords2 = this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50));
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)hoodieRecords2)).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.write().format("org.apache.hudi").options((Map)options).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        HoodieInstant instant2 = (HoodieInstant)this.metaClient.reloadActiveTimeline().lastInstant().get();
        List<String> cdcLogFiles2 = this.getCDCLogFile(instant2);
        Assertions.assertTrue((boolean)this.isFilesExistInFileSystem(cdcLogFiles2));
        java.util.List hoodieRecords3 = this.dataGen.generateUniqueUpdates("002", Predef$.MODULE$.int2Integer(50));
        List records3 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)hoodieRecords3)).asScala()).toList();
        Dataset inputDF3 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records3, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF3.write().format("org.apache.hudi").options((Map)options).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        java.util.List hoodieRecords4 = this.dataGen.generateUniqueUpdates("003", Predef$.MODULE$.int2Integer(50));
        List records4 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)hoodieRecords4)).asScala()).toList();
        Dataset inputDF4 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records4, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF4.write().format("org.apache.hudi").options((Map)options).option("hoodie.datasource.write.operation", "upsert").mode(SaveMode.Append).save(this.basePath);
        Assertions.assertFalse((boolean)this.isFilesExistInFileSystem(cdcLogFiles2));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieCDCSupplementalLoggingMode.class)
    public void testCDCWhenFirstWriteContainsUpsertAndDelete(HoodieCDCSupplementalLoggingMode loggingMode) {
        StructType schema = StructType$.MODULE$.apply((Seq)new .colon.colon((Object)new StructField("_id", (DataType)StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("Op", (DataType)StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("replicadmstimestamp", (DataType)StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("code", (DataType)StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), (List)new .colon.colon((Object)new StructField("partition", (DataType)StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), (List)Nil$.MODULE$))))));
        SparkContext qual$1 = this.spark().sparkContext();
        Seq x$1 = (Seq)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"1", "I", "2023-06-14 15:46:06.953746", "A", "A"})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"1", "U", "2023-06-20 15:46:06.953746", "A", "A"})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"2", "I", "2023-06-14 15:46:06.953746", "A", "A"})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"2", "D", "2023-06-20 15:46:06.953746", "A", "A"})), (List)Nil$.MODULE$))));
        int x$2 = qual$1.parallelize$default$2();
        RDD rdd1 = qual$1.parallelize(x$1, x$2, ClassTag$.MODULE$.apply(Row.class));
        Dataset df1 = this.spark().createDataFrame(rdd1, schema);
        df1.write().format("hudi").option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).options(QuickstartUtils.getQuickstartWriteConfigs()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY(), "_id").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY(), "replicadmstimestamp").option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY(), "partition").option(HoodieWriteConfig.TBL_NAME.key(), new StringBuilder(0).append(this.tableName).append(loggingMode.name()).toString()).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload").option(DataSourceWriteOptions$.MODULE$.RECORD_MERGE_MODE().key(), RecordMergeMode.CUSTOM.name()).option("hoodie.table.cdc.enabled", "true").option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name()).mode(SaveMode.Append).save(this.basePath);
        SparkContext qual$2 = this.spark().sparkContext();
        Seq x$3 = (Seq)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"1", "U", "2023-06-14 15:46:06.953746", "A", "A"})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"2", "U", "2023-06-20 15:46:06.953746", "A", "A"})), (List)new .colon.colon((Object)Row$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"3", "I", "2023-06-20 15:46:06.953746", "A", "A"})), (List)Nil$.MODULE$)));
        int x$4 = qual$2.parallelize$default$2();
        RDD rdd2 = qual$2.parallelize(x$3, x$4, ClassTag$.MODULE$.apply(Row.class));
        Dataset df2 = this.spark().createDataFrame(rdd2, schema);
        df2.write().format("hudi").option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), DataSourceWriteOptions$.MODULE$.MOR_TABLE_TYPE_OPT_VAL()).options(QuickstartUtils.getQuickstartWriteConfigs()).option(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD_OPT_KEY(), "_id").option(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD_OPT_KEY(), "replicadmstimestamp").option(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD_OPT_KEY(), "partition").option(HoodieWriteConfig.TBL_NAME.key(), new StringBuilder(0).append(this.tableName).append(loggingMode.name()).toString()).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload").option(DataSourceWriteOptions$.MODULE$.RECORD_MERGE_MODE().key(), RecordMergeMode.CUSTOM.name()).option("hoodie.table.cdc.enabled", "true").option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name()).mode(SaveMode.Append).save(this.basePath);
        HoodieTableMetaClient metaClient = this.createMetaClient(this.spark(), this.basePath);
        String startTimeStamp = ((HoodieInstant)metaClient.reloadActiveTimeline().firstInstant().get()).requestedTime();
        String latestTimeStamp = ((HoodieInstant)metaClient.reloadActiveTimeline().lastInstant().get()).requestedTime();
        Dataset result1 = this.spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", "0").option("hoodie.datasource.read.end.instanttime", startTimeStamp).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        result1.show(false);
        this.assertCDCOpCnt((Dataset<Row>)result1, 1L, 0L, 0L);
        Assertions.assertEquals((long)result1.count(), (long)1L);
        Dataset result2 = this.spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", startTimeStamp).option("hoodie.datasource.read.end.instanttime", latestTimeStamp).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        result2.show(false);
        this.assertCDCOpCnt((Dataset<Row>)result2, 2L, 1L, 0L);
        Assertions.assertEquals((long)result2.count(), (long)3L);
        Dataset result3 = this.spark().read().format("hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", "0").option("hoodie.datasource.read.end.instanttime", latestTimeStamp).option("hoodie.datasource.query.incremental.format", "cdc").load(this.basePath);
        result3.show(false);
        this.assertCDCOpCnt((Dataset<Row>)result3, 3L, 1L, 0L);
        Assertions.assertEquals((long)result3.count(), (long)4L);
    }

    public static final /* synthetic */ boolean $anonfun$testMORDataSourceWrite$2(HoodieRecord r) {
        String string = ((GenericRecord)r.getData()).get(0).toString();
        String string2 = "u";
        return !(string != null ? !string.equals(string2) : string2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testMORDataSourceWrite$3(HoodieRecord r) {
        String string = ((GenericRecord)r.getData()).get(0).toString();
        String string2 = "i";
        return !(string != null ? !string.equals(string2) : string2 != null);
    }
}

