/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.Serializable;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.HoodieStreamingSink$;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.hudi.testutils.HoodieSparkDeleteRecordMerger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\r\rc\u0001\u0002\u0013&\u00019BQ!\u000e\u0001\u0005\u0002YBq!\u000f\u0001C\u0002\u0013%!\b\u0003\u0004B\u0001\u0001\u0006Ia\u000f\u0005\n\u0005\u0002\u0001\r\u00111A\u0005\u0002\rC\u0011b\u0013\u0001A\u0002\u0003\u0007I\u0011\u0001'\t\u0013U\u0003\u0001\u0019!A!B\u0013!\u0005b\u0002,\u0001\u0005\u0004%\ta\u0016\u0005\u0007Q\u0002\u0001\u000b\u0011\u0002-\t\u000b%\u0004A\u0011\t6\t\u000bY\u0004A\u0011A<\t\u000f\u0005E\u0002\u0001\"\u0001\u00024!9\u00111\t\u0001\u0005\u0002\u0005\u0015\u0003bBA.\u0001\u0011\u0005\u0011Q\f\u0005\b\u0003o\u0002A\u0011AA=\u0011\u001d\t9\t\u0001C\u0001\u0003\u0013Cq!a%\u0001\t\u0003\t)\nC\u0004\u0002:\u0002!I!a/\t\u000f\t\u0015\u0002\u0001\"\u0001\u0003(!9!Q\b\u0001\u0005\u0002\t}\u0002B\u0002B%\u0001\u0011\u0005!\u000e\u0003\u0004\u0003T\u0001!\tA\u001b\u0005\u0007\u0005/\u0002A\u0011\u00016\t\u000f\tm\u0003\u0001\"\u0001\u0003^!I!1\r\u0001\u0012\u0002\u0013\u0005!Q\r\u0005\b\u0005w\u0002A\u0011\u0001B?\u0011\u001d\u00119\n\u0001C\u0001\u00053CqAa-\u0001\t\u0013\u0011)\fC\u0004\u0003B\u0002!IAa1\t\u000f\t]\u0007\u0001\"\u0003\u0003Z\"9!Q\u001d\u0001\u0005\n\t\u001d\bB\u0002By\u0001\u0011\u0005!\u000eC\u0004\u0003v\u0002!\tAa>\t\u0019\r\r\u0002\u0001%A\u0001\u0002\u0003%\ta!\n\t\u0019\r%\u0002\u0001%A\u0001\u0002\u0003%\taa\u000b\t\u0019\rM\u0002\u0001%A\u0001\u0002\u0003%\ta!\u000e\u0003/Q+7\u000f^*ueV\u001cG/\u001e:fIN#(/Z1nS:<'B\u0001\u0014(\u0003)1WO\\2uS>t\u0017\r\u001c\u0006\u0003Q%\nA\u0001[;eS*\u0011!fK\u0001\u0007CB\f7\r[3\u000b\u00031\n1a\u001c:h\u0007\u0001\u0019\"\u0001A\u0018\u0011\u0005A\u001aT\"A\u0019\u000b\u0005I:\u0013!\u0003;fgR,H/\u001b7t\u0013\t!\u0014GA\rI_>$\u0017.Z*qCJ\\7\t\\5f]R$Vm\u001d;CCN,\u0017A\u0002\u001fj]&$h\bF\u00018!\tA\u0004!D\u0001&\u0003\rawnZ\u000b\u0002wA\u0011AhP\u0007\u0002{)\u0011ahK\u0001\u0006g24GG[\u0005\u0003\u0001v\u0012a\u0001T8hO\u0016\u0014\u0018\u0001\u00027pO\u0002\nQa\u001d9be.,\u0012\u0001\u0012\t\u0003\u000b&k\u0011A\u0012\u0006\u0003\u000f\"\u000b1a]9m\u0015\t\u0011\u0015&\u0003\u0002K\r\na1\u000b]1sWN+7o]5p]\u0006I1\u000f]1sW~#S-\u001d\u000b\u0003\u001bN\u0003\"AT)\u000e\u0003=S\u0011\u0001U\u0001\u0006g\u000e\fG.Y\u0005\u0003%>\u0013A!\u00168ji\"9A+BA\u0001\u0002\u0004!\u0015a\u0001=%c\u000511\u000f]1sW\u0002\n!bY8n[>tw\n\u001d;t+\u0005A\u0006\u0003B-_A\u0002l\u0011A\u0017\u0006\u00037r\u000b\u0011\"[7nkR\f'\r\\3\u000b\u0005u{\u0015AC2pY2,7\r^5p]&\u0011qL\u0017\u0002\u0004\u001b\u0006\u0004\bCA1g\u001b\u0005\u0011'BA2e\u0003\u0011a\u0017M\\4\u000b\u0003\u0015\fAA[1wC&\u0011qM\u0019\u0002\u0007'R\u0014\u0018N\\4\u0002\u0017\r|W.\\8o\u001fB$8\u000fI\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002\u001b\"\u0012\u0011\u0002\u001c\t\u0003[Rl\u0011A\u001c\u0006\u0003_B\f1!\u00199j\u0015\t\t(/A\u0004kkBLG/\u001a:\u000b\u0005M\\\u0013!\u00026v]&$\u0018BA;o\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u001aS:LGo\u0016:ji&twm\u0015;sK\u0006l\u0017N\\4Rk\u0016\u0014\u0018\u0010\u0006\u0005y}\u00065\u0011QEA\u0015!\tIH0D\u0001{\u0015\tYh)A\u0005tiJ,\u0017-\\5oO&\u0011QP\u001f\u0002\u000f'R\u0014X-Y7j]\u001e\fV/\u001a:z\u0011\u0019y(\u00021\u0001\u0002\u0002\u000511o\u00195f[\u0006\u0004B!a\u0001\u0002\n5\u0011\u0011Q\u0001\u0006\u0004\u0003\u000f1\u0015!\u0002;za\u0016\u001c\u0018\u0002BA\u0006\u0003\u000b\u0011!b\u0015;sk\u000e$H+\u001f9f\u0011\u001d\tyA\u0003a\u0001\u0003#\t!b]8ve\u000e,\u0007+\u0019;i!\u0011\t\u0019\"!\t\u000f\t\u0005U\u0011Q\u0004\t\u0004\u0003/yUBAA\r\u0015\r\tY\"L\u0001\u0007yI|w\u000e\u001e \n\u0007\u0005}q*\u0001\u0004Qe\u0016$WMZ\u0005\u0004O\u0006\r\"bAA\u0010\u001f\"9\u0011q\u0005\u0006A\u0002\u0005E\u0011\u0001\u00033fgR\u0004\u0016\r\u001e5\t\u000f\u0005-\"\u00021\u0001\u0002.\u0005Y\u0001.\u001e3j\u001fB$\u0018n\u001c8t!!\t\u0019\"a\f\u0002\u0012\u0005E\u0011bA0\u0002$\u0005q\u0012N\\5u'R\u0014X-Y7j]\u001e\u001cv.\u001e:dK\u0006sG\rR3tiB\u000bG\u000f\u001b\u000b\u0007\u0003k\tY$a\u0010\u0011\u000f9\u000b9$!\u0005\u0002\u0012%\u0019\u0011\u0011H(\u0003\rQ+\b\u000f\\33\u0011\u001d\tid\u0003a\u0001\u0003#\tQb]8ve\u000e,G)\u001b:OC6,\u0007bBA!\u0017\u0001\u0007\u0011\u0011C\u0001\fI\u0016\u001cH\u000fR5s\u001d\u0006lW-\u0001\u000bhKR|\u0005\u000f^:XSRDG+\u00192mKRK\b/\u001a\u000b\u0005\u0003[\t9\u0005C\u0004\u0002J1\u0001\r!a\u0013\u0002\u0013Q\f'\r\\3UsB,\u0007\u0003BA'\u0003/j!!a\u0014\u000b\t\u0005E\u00131K\u0001\u0006[>$W\r\u001c\u0006\u0004\u0003+:\u0013AB2p[6|g.\u0003\u0003\u0002Z\u0005=#a\u0004%p_\u0012LW\rV1cY\u0016$\u0016\u0010]3\u0002#\u001d,Go\u00117vgR,'/\u001b8h\u001fB$8\u000f\u0006\u0007\u0002.\u0005}\u0013\u0011MA3\u0003S\ni\u0007C\u0004\u0002J5\u0001\r!a\u0013\t\u000f\u0005\rT\u00021\u0001\u0002\u0012\u0005\u0011\u0012n]%oY&tWm\u00117vgR,'/\u001b8h\u0011\u001d\t9'\u0004a\u0001\u0003#\t\u0011#[:Bgft7m\u00117vgR,'/\u001b8h\u0011\u001d\tY'\u0004a\u0001\u0003#\t1c\u00197vgR,'/\u001b8h\u001dVl7i\\7nSRDq!a\u001c\u000e\u0001\u0004\t\t(\u0001\tgS2,W*\u0019=SK\u000e|'\u000f\u001a(v[B\u0019a*a\u001d\n\u0007\u0005UtJA\u0002J]R\f\u0011cZ3u\u0007>l\u0007/Y2uS>tw\n\u001d;t)\u0019\ti#a\u001f\u0002~!9\u0011\u0011\n\bA\u0002\u0005-\u0003bBA@\u001d\u0001\u0007\u0011\u0011Q\u0001\u0012SN\f5/\u001f8d\u0007>l\u0007/Y2uS>t\u0007c\u0001(\u0002\u0004&\u0019\u0011QQ(\u0003\u000f\t{w\u000e\\3b]\u0006i2\u000f\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oOR+7\u000f\u001e*v]:,'\u000fF\u0004N\u0003\u0017\u000bi)!%\t\u000f\u0005%s\u00021\u0001\u0002L!9\u0011qR\bA\u0002\u0005\u0005\u0015\u0001F1eI\u000e{W\u000e]1di&|gnQ8oM&<7\u000fC\u0004\u0002\u0000=\u0001\r!!!\u0002/Q,7\u000f^*ueV\u001cG/\u001e:fIN#(/Z1nS:<GcA'\u0002\u0018\"9\u0011\u0011\n\tA\u0002\u0005-\u0003f\u0002\t\u0002\u001c\u0006-\u0016Q\u0016\t\u0005\u0003;\u000b9+\u0004\u0002\u0002 *!\u0011\u0011UAR\u0003!\u0001(o\u001c<jI\u0016\u0014(bAASa\u00061\u0001/\u0019:b[NLA!!+\u0002 \nQQI\\;n'>,(oY3\u0002\u000bY\fG.^3$\u0005\u0005-\u0003f\u0001\t\u00022B!\u00111WA[\u001b\t\t\u0019+\u0003\u0003\u00028\u0006\r&!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u00069r/Y5u)&dG.\u0011;mK\u0006\u001cHOT\"p[6LGo\u001d\u000b\r\u0003c\ni,a3\u0002P\u0006M\u0017q\u001b\u0005\b\u0003\u007f\u000b\u0002\u0019AAa\u0003\u001d\u0019Ho\u001c:bO\u0016\u0004B!a1\u0002H6\u0011\u0011Q\u0019\u0006\u0004\u0003\u007f;\u0013\u0002BAe\u0003\u000b\u0014Q\u0002S8pI&,7\u000b^8sC\u001e,\u0007bBAg#\u0001\u0007\u0011\u0011C\u0001\ni\u0006\u0014G.\u001a)bi\"Dq!!5\u0012\u0001\u0004\t\t(\u0001\u0006ok6\u001cu.\\7jiNDq!!6\u0012\u0001\u0004\t\t(A\u0006uS6,w.\u001e;TK\u000e\u001c\bbBAm#\u0001\u0007\u0011\u0011O\u0001\u0016g2,W\r]*fGN\fe\r^3s\u000b\u0006\u001c\u0007NU;oQ\u0015\t\u0012Q\\A{!\u0015q\u0015q\\Ar\u0013\r\t\to\u0014\u0002\u0007i\"\u0014xn^:\u0011\t\u0005\u0015\u0018q\u001e\b\u0005\u0003O\fYO\u0004\u0003\u0002\u0018\u0005%\u0018\"\u0001)\n\u0007\u00055x*A\u0004qC\u000e\\\u0017mZ3\n\t\u0005E\u00181\u001f\u0002\u0015\u0013:$XM\u001d:vaR,G-\u0012=dKB$\u0018n\u001c8\u000b\u0007\u00055x*M\u0004\u001f\u0003#\t9Pa\t2\u0013\r\nIP!\u0001\u0003\u001a\t\rQ\u0003BA~\u0003{,\"!!\u0005\u0005\u000f\u0005}XF1\u0001\u0003\n\t\tA+\u0003\u0003\u0003\u0004\t\u0015\u0011a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0013GC\u0002\u0003\b=\u000ba\u0001\u001e5s_^\u001c\u0018\u0003\u0002B\u0006\u0005#\u00012A\u0014B\u0007\u0013\r\u0011ya\u0014\u0002\b\u001d>$\b.\u001b8h!\u0011\u0011\u0019B!\u0006\u000f\u00079\u000bY/\u0003\u0003\u0003\u0018\u0005M(!\u0003+ie><\u0018M\u00197fc%\u0019#1\u0004B\u000f\u0005?\u00119AD\u0002O\u0005;I1Aa\u0002Pc\u0015\u0011cj\u0014B\u0011\u0005\u0015\u00198-\u00197bc\r1\u00131]\u0001&i\u0016\u001cHo\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5DYV\u001cH/\u001a:j]\u001e$2!\u0014B\u0015\u0011\u001d\t9G\u0005a\u0001\u0003\u0003CsA\u0005B\u0017\u0005g\u0011)\u0004\u0005\u0003\u0002\u001e\n=\u0012\u0002\u0002B\u0019\u0003?\u00131BV1mk\u0016\u001cv.\u001e:dK\u0006A!m\\8mK\u0006t7\u000f\f\u0003\u00038\te\u0012$A\u0001\u001a\u0003\u0001A3AEAY\u0003\u0015\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&twmV5uQ\u000e{W\u000e]1di&|g\u000eF\u0002N\u0005\u0003Bq!a \u0014\u0001\u0004\t\t\tK\u0004\u0014\u0005[\u0011\u0019D!\u0012-\t\t]\"\u0011\b\u0015\u0004'\u0005E\u0016!\n;fgR\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/&$\bn\u00115fG.\u0004x.\u001b8uQ\r!\"Q\n\t\u0004[\n=\u0013b\u0001B)]\n!A+Z:u\u0003-\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&twMR8s\t\u00164\u0017-\u001e7u\u0013\u0012,g\u000e^5gS\u0016\u0014\bfA\u000b\u0003N\u0005)C/Z:u'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,ji\"\u0014U\u000f\\6J]N,'\u000f\u001e\u0015\u0004-\t5\u0013a\b;fgR\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h\u0013:$XM\u001d8bYR\u0019QJa\u0018\t\u0013\t\u0005t\u0003%AA\u0002\u0005E\u0011!C8qKJ\fG/[8o\u0003%\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&tw-\u00138uKJt\u0017\r\u001c\u0013eK\u001a\fW\u000f\u001c;%cU\u0011!q\r\u0016\u0005\u0003#\u0011Ig\u000b\u0002\u0003lA!!Q\u000eB<\u001b\t\u0011yG\u0003\u0003\u0003r\tM\u0014!C;oG\",7m[3e\u0015\r\u0011)hT\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002\u0002B=\u0005_\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003\u0005\n7o]3si2\u000bG/Z:u\u0007\",7m\u001b9pS:$\u0018J\u001c4p\u001b\u0006$8\r[3e)\u001di%q\u0010BH\u0005'CqA!!\u001a\u0001\u0004\u0011\u0019)\u0001\u0006nKR\f7\t\\5f]R\u0004BA!\"\u0003\f6\u0011!q\u0011\u0006\u0005\u0005\u0013\u000b\u0019&A\u0003uC\ndW-\u0003\u0003\u0003\u000e\n\u001d%!\u0006%p_\u0012LW\rV1cY\u0016lU\r^1DY&,g\u000e\u001e\u0005\b\u0005#K\u0002\u0019AA\t\u0003)IG-\u001a8uS\u001aLWM\u001d\u0005\b\u0005+K\u0002\u0019AA\t\u00035)\u0007\u0010]3di\n\u000bGo\u00195JI\u0006Q3\u000f\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO\u001a{'\u000fV3ti\u000ecWo\u001d;fe&twMU;o]\u0016\u0014HcD'\u0003\u001c\nu%q\u0014BQ\u0005G\u0013)K!+\t\u000f\u0005=!\u00041\u0001\u0002\u0012!9\u0011q\u0005\u000eA\u0002\u0005E\u0001bBA%5\u0001\u0007\u00111\n\u0005\b\u0003GR\u0002\u0019AAA\u0011\u001d\t9G\u0007a\u0001\u0003\u0003CqAa*\u001b\u0001\u0004\t\t\"\u0001\nqCJ$\u0018\u000e^5p]>3'+Z2pe\u0012\u001c\bb\u0002BV5\u0001\u0007!QV\u0001\u0016G\",7m[\"mkN$XM]5oOJ+7/\u001e7u!\u0019q%qVA\t\u001b&\u0019!\u0011W(\u0003\u0013\u0019+hn\u0019;j_:\f\u0014!G4fi2\u000bG/Z:u\r&dWm\u0012:pkB\u001ch)\u001b7f\u0013\u0012$BAa.\u0003>B)aJ!/\u0002\u0012%\u0019!1X(\u0003\u000b\u0005\u0013(/Y=\t\u000f\t}6\u00041\u0001\u0002\u0012\u0005I\u0001/\u0019:uSRLwN\\\u0001#o\u0006LG\u000fV5mY\"\u000b7oQ8na2,G/\u001a3SKBd\u0017mY3J]N$\u0018M\u001c;\u0015\u000f5\u0013)Ma2\u0003J\"9\u0011Q\u001a\u000fA\u0002\u0005E\u0001bBAk9\u0001\u0007\u0011\u0011\u000f\u0005\b\u00033d\u0002\u0019AA9Q\u0015a\u0012Q\u001cBgc\u001dq\u0012\u0011\u0003Bh\u0005+\f\u0014bIA}\u0005\u0003\u0011\tNa\u00012\u0013\r\u0012YB!\b\u0003T\n\u001d\u0011'\u0002\u0012O\u001f\n\u0005\u0012g\u0001\u0014\u0002d\u0006iA.\u0019;fgRLen\u001d;b]R$\u0002\"!\u0005\u0003\\\nu'\u0011\u001d\u0005\b\u0003\u007fk\u0002\u0019AAa\u0011\u001d\u0011y.\ba\u0001\u0003#\t\u0001BY1tKB\u000bG\u000f\u001b\u0005\b\u0005Gl\u0002\u0019AA\t\u00035Ign\u001d;b]R\f5\r^5p]\u0006q1\u000f\u001e:fC6LgnZ,sSR,G#C'\u0003j\n-(Q\u001eBx\u0011\u0019yh\u00041\u0001\u0002\u0002!9\u0011q\u0002\u0010A\u0002\u0005E\u0001bBA\u0014=\u0001\u0007\u0011\u0011\u0003\u0005\b\u0003Wq\u0002\u0019AA\u0017\u00035\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&twmV5uQ\u0012K7/\u00192mK\u0012\u001cu.\u001c9bGRLwN\u001c\u0015\u0004?\t5\u0013\u0001\n;fgR\u001cFO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h/&$\b.T3sO\u0016lu\u000eZ3\u0015\u000b5\u0013IPa?\t\u000f\u0005%\u0003\u00051\u0001\u0002\u0012!9!Q \u0011A\u0002\u0005E\u0011!C7fe\u001e,Wj\u001c3fQ\u001d\u00013\u0011AAV\u0007\u000f\u0001B!!(\u0004\u0004%!1QAAP\u0005%\u00195O^*pkJ\u001cW\r\f\u0007\u0004\n\r51\u0011CB\u000b\u00073\u0019i\"\t\u0002\u0004\f\u0005\t3i\u0014)Z?>sul\u0016*J)\u0016cSIV#O)~#\u0016*T#`\u001fJ#UIU%O\u000f\u0006\u00121qB\u0001\"\u001b\u0016\u0013v)R0P\u001d~\u0013V)\u0011#-\u000bZ+e\nV0U\u00136+ul\u0014*E\u000bJKejR\u0011\u0003\u0007'\t!eQ(Q3~{ejX,S\u0013R+EfQ(N\u001b&#v\fV%N\u000b~{%\u000bR#S\u0013:;\u0015EAB\f\u0003\tjUIU$F?>suLU#B\t2\u001au*T'J)~#\u0016*T#`\u001fJ#UIU%O\u000f\u0006\u001211D\u0001\u0015\u0007>\u0003\u0016lX(O?^\u0013\u0016\nV#-\u0007V\u001bFkT'\"\u0005\r}\u0011\u0001F'F%\u001e+ul\u0014(`%\u0016\u000bE\tL\"V'R{U\nK\u0002!\u0003c\u000b\u0011\u0003\u001d:pi\u0016\u001cG/\u001a3%gR|'/Y4f)\u0011\t\tma\n\t\u000fQ\u000b\u0013\u0011!a\u0001o\u00059\u0002O]8uK\u000e$X\r\u001a\u0013tKRlW\r^1DY&,g\u000e\u001e\u000b\u0006\u001b\u000e52q\u0006\u0005\b)\n\n\t\u00111\u00018\u0011%\u0019\tDIA\u0001\u0002\u0004\u0011\u0019)A\u0002yII\n\u0011\u0003\u001d:pi\u0016\u001cG/\u001a3%I\u0006$\u0018mR3o)\u0011\u00199d!\u0011\u0011\t\re2QH\u0007\u0003\u0007wQ1AMA*\u0013\u0011\u0019yda\u000f\u0003/!{w\u000eZ5f)\u0016\u001cH\u000fR1uC\u001e+g.\u001a:bi>\u0014\bb\u0002+$\u0003\u0003\u0005\ra\u000e")
public class TestStructuredStreaming
extends HoodieSparkClientTestBase {
    private final Logger log = LoggerFactory.getLogger((Class)this.getClass());
    private SparkSession spark;
    private final scala.collection.immutable.Map<String, String> commonOpts = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)"partition"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.TBL_NAME.key()), (Object)"hoodie_test")}));

    public /* synthetic */ HoodieStorage protected$storage(TestStructuredStreaming x$1) {
        return x$1.storage;
    }

    public /* synthetic */ void protected$setmetaClient(TestStructuredStreaming x$1, HoodieTableMetaClient x$2) {
        x$1.metaClient = x$2;
    }

    public /* synthetic */ HoodieTestDataGenerator protected$dataGen(TestStructuredStreaming x$1) {
        return x$1.dataGen;
    }

    private Logger log() {
        return this.log;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    public scala.collection.immutable.Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @BeforeEach
    public void setUp() {
        super.setUp();
        this.spark_$eq(this.sqlContext.sparkSession());
        this.spark().conf().set("spark.sql.streaming.stopTimeout", 30000L);
    }

    public StreamingQuery initWritingStreamingQuery(StructType schema, String sourcePath, String destPath, scala.collection.immutable.Map<String, String> hudiOptions) {
        Dataset streamingInput = this.spark().readStream().schema(schema).json(sourcePath);
        return streamingInput.writeStream().format("org.apache.hudi").options(hudiOptions).trigger(Trigger.ProcessingTime((long)1000L)).option("checkpointLocation", new StringBuilder(11).append(this.basePath).append("/checkpoint").toString()).outputMode(OutputMode.Append()).start(destPath);
    }

    public Tuple2<String, String> initStreamingSourceAndDestPath(String sourceDirName, String destDirName) {
        this.storage.deleteDirectory(new StoragePath(this.basePath));
        String sourcePath = new StringBuilder(1).append(this.basePath).append("/").append(sourceDirName).toString();
        String destPath = new StringBuilder(1).append(this.basePath).append("/").append(destDirName).toString();
        this.storage.createDirectory(new StoragePath(sourcePath));
        return new Tuple2((Object)sourcePath, (Object)destPath);
    }

    public scala.collection.immutable.Map<String, String> getOptsWithTableType(HoodieTableType tableType) {
        return this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType.name()));
    }

    public scala.collection.immutable.Map<String, String> getClusteringOpts(HoodieTableType tableType, String isInlineClustering, String isAsyncClustering, String clusteringNumCommit, int fileMaxRecordNum) {
        return this.getOptsWithTableType(tableType).$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING.key()), (Object)isInlineClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_CLUSTERING_ENABLE().key()), (Object)isAsyncClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key()), (Object)Integer.toString(this.dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum)))})));
    }

    public scala.collection.immutable.Map<String, String> getCompactionOpts(HoodieTableType tableType, boolean isAsyncCompaction) {
        return this.getOptsWithTableType(tableType).$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_COMPACT_ENABLE().key()), (Object)Boolean.toString(isAsyncCompaction)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()), (Object)"1")})));
    }

    public void structuredStreamingTestRunner(HoodieTableType tableType, boolean addCompactionConfigs, boolean isAsyncCompaction) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        String sourcePath2 = (String)tuple22._1();
        String destPath2 = (String)tuple22._2();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUpdates("001", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        long uniqueKeyCnt = inputDF2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).distinct().count();
        scala.collection.immutable.Map<String, String> hudiOptions = addCompactionConfigs ? this.getCompactionOpts(tableType, isAsyncCompaction) : this.getOptsWithTableType(tableType);
        StreamingQuery streamingQuery = this.initWritingStreamingQuery(inputDF1.schema(), sourcePath2, destPath2, hudiOptions);
        Future f2 = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
            int currNumCommits = this.waitTillAtleastNCommits(this.protected$storage(this), destPath2, 1, 120, 5);
            Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((HoodieStorage)this.protected$storage(this), (String)destPath2, (String)"000"));
            String commitCompletionTime1 = DataSourceTestUtils.latestCommitCompletionTime(this.protected$storage(this), destPath2);
            Dataset hoodieROViewDF1 = this.spark().read().format("org.apache.hudi").load(destPath2);
            Predef$.MODULE$.assert(hoodieROViewDF1.count() == 100L);
            inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
            int numExpectedCommits = addCompactionConfigs ? currNumCommits + 2 : currNumCommits + 1;
            this.waitTillAtleastNCommits(this.protected$storage(this), destPath2, numExpectedCommits, 120, 5);
            HoodieTableType hoodieTableType = tableType;
            HoodieTableType hoodieTableType2 = HoodieTableType.MERGE_ON_READ;
            String commitInstantTime2 = !(hoodieTableType != null ? !hoodieTableType.equals(hoodieTableType2) : hoodieTableType2 != null) ? this.latestInstant(this.protected$storage(this), destPath2, "deltacommit") : HoodieDataSourceHelpers.latestCommit((HoodieStorage)this.protected$storage(this), (String)destPath2);
            HoodieTableType hoodieTableType3 = tableType;
            HoodieTableType hoodieTableType4 = HoodieTableType.MERGE_ON_READ;
            String commitCompletionTime2 = !(hoodieTableType3 != null ? !hoodieTableType3.equals(hoodieTableType4) : hoodieTableType4 != null) ? DataSourceTestUtils.latestDeltaCommitCompletionTime(this.protected$storage(this), destPath2) : DataSourceTestUtils.latestCommitCompletionTime(this.protected$storage(this), destPath2);
            Assertions.assertEquals((int)numExpectedCommits, (int)HoodieDataSourceHelpers.listCommitsSince((HoodieStorage)this.protected$storage(this), (String)destPath2, (String)"000").size());
            Dataset hoodieROViewDF2 = this.spark().read().format("org.apache.hudi").load(destPath2);
            Assertions.assertEquals((long)100L, (long)hoodieROViewDF2.count());
            String firstCommit = (String)HoodieDataSourceHelpers.listCommitsSince((HoodieStorage)this.protected$storage(this), (String)destPath2, (String)"000").get(0);
            Dataset hoodieIncViewDF1 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.START_COMMIT().key(), commitCompletionTime1).option(DataSourceReadOptions$.MODULE$.END_COMMIT().key(), commitCompletionTime1).load(destPath2);
            Assertions.assertEquals((long)100L, (long)hoodieIncViewDF1.count());
            Row[] countsPerCommit = (Row[])hoodieIncViewDF1.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)1, (int)countsPerCommit.length);
            Assertions.assertEquals((Object)firstCommit, (Object)countsPerCommit[0].get(0));
            Dataset hoodieIncViewDF2 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.START_COMMIT().key(), commitCompletionTime2).load(destPath2);
            Assertions.assertEquals((long)uniqueKeyCnt, (long)hoodieIncViewDF2.count());
            countsPerCommit = (Row[])hoodieIncViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)1, (int)countsPerCommit.length);
            Assertions.assertEquals((Object)commitInstantTime2, (Object)countsPerCommit[0].get(0));
            streamingQuery.stop();
        }, ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)f2, Duration$.MODULE$.apply("120s"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testStructuredStreaming(HoodieTableType tableType) {
        this.structuredStreamingTestRunner(tableType, false, false);
    }

    private int waitTillAtleastNCommits(HoodieStorage storage, String tablePath, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        int numInstants = 0;
        boolean success = false;
        while (!success && currTime - beginTime < (long)timeoutMsecs) {
            try {
                try {
                    HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((HoodieStorage)storage, (String)tablePath);
                    this.log().info(new StringBuilder(10).append("Timeline :").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(timeline.getInstants().toArray())).mkString("Array(", ", ", ")")).toString());
                    if (timeline.countInstants() < numCommits) continue;
                    numInstants = timeline.countInstants();
                    success = true;
                }
                catch (TableNotFoundException tableNotFoundException) {
                    this.log().info("Got table not found exception. Retrying");
                }
            }
            finally {
                if (success) continue;
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (!success) {
            throw new IllegalStateException(new StringBuilder(44).append("Timed-out waiting for ").append(numCommits).append(" commits to appear in ").append(tablePath).toString());
        }
        return numInstants;
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithClustering(boolean isAsyncClustering) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath2 = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath2);
        String sourcePath2 = (String)tuple22._1();
        String destPath3 = (String)tuple22._2();
        this.structuredStreamingForTestClusteringRunner(sourcePath2, destPath3, HoodieTableType.COPY_ON_WRITE, !isAsyncClustering, isAsyncClustering, "2016/03/15", (Function1<String, BoxedUnit>)(Function1 & Serializable & scala.Serializable)destPath -> {
            this.checkClusteringResult$1(destPath);
            return BoxedUnit.UNIT;
        });
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithCompaction(boolean isAsyncCompaction) {
        this.structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction);
    }

    @Test
    public void testStructuredStreamingWithCheckpoint() {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        String sourcePath2 = (String)tuple22._1();
        String destPath2 = (String)tuple22._2();
        scala.collection.immutable.Map opts = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key()), (Object)WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()), (Object)InProcessLockProvider.class.getName())})));
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), "2016/03/15"))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        StructType schema = inputDF1.schema();
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        StreamingQuery query1 = this.spark().readStream().schema(schema).json(sourcePath2).writeStream().format("org.apache.hudi").options((Map)opts).outputMode(OutputMode.Append()).option(DataSourceWriteOptions$.MODULE$.STREAMING_CHECKPOINT_IDENTIFIER().key(), "streaming_identifier1").option("checkpointLocation", new StringBuilder(12).append(this.basePath).append("/checkpoint1").toString()).start(destPath2);
        query1.processAllAvailable();
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)this.storage, (String)destPath2);
        this.assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0");
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(100), "2016/03/15"))).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        StreamingQuery query2 = this.spark().readStream().schema(schema).json(sourcePath2).writeStream().format("org.apache.hudi").options((Map)opts).outputMode(OutputMode.Append()).option(DataSourceWriteOptions$.MODULE$.STREAMING_CHECKPOINT_IDENTIFIER().key(), "streaming_identifier2").option("checkpointLocation", new StringBuilder(12).append(this.basePath).append("/checkpoint2").toString()).start(destPath2);
        query2.processAllAvailable();
        query1.processAllAvailable();
        query1.stop();
        query2.stop();
        this.assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0");
        this.assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1");
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        StreamingQuery query3 = this.spark().readStream().schema(schema).json(sourcePath2).writeStream().format("org.apache.hudi").options(this.commonOpts()).outputMode(OutputMode.Append()).option(DataSourceWriteOptions$.MODULE$.STREAMING_CHECKPOINT_IDENTIFIER().key(), "streaming_identifier1").option("checkpointLocation", new StringBuilder(12).append(this.basePath).append("/checkpoint1").toString()).start(destPath2);
        query3.processAllAvailable();
        query3.stop();
        metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)this.storage, (String)destPath2);
        this.assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "2");
        this.assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0");
    }

    @Test
    public void testStructuredStreamingForDefaultIdentifier() {
        this.testStructuredStreamingInternal(this.testStructuredStreamingInternal$default$1());
    }

    @Test
    public void testStructuredStreamingWithBulkInsert() {
        this.testStructuredStreamingInternal("bulk_insert");
    }

    public void testStructuredStreamingInternal(String operation) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        String sourcePath2 = (String)tuple22._1();
        String destPath2 = (String)tuple22._2();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), "2016/03/15"))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        StructType schema = inputDF1.schema();
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        StreamingQuery query1 = this.spark().readStream().schema(schema).json(sourcePath2).writeStream().format("org.apache.hudi").options(this.commonOpts()).option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), operation).outputMode(OutputMode.Append()).option("checkpointLocation", new StringBuilder(12).append(this.basePath).append("/checkpoint1").toString()).start(destPath2);
        query1.processAllAvailable();
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)this.storage, (String)destPath2);
        this.assertLatestCheckpointInfoMatched(metaClient, (String)DataSourceWriteOptions$.MODULE$.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue(), "0");
        query1.stop();
    }

    public String testStructuredStreamingInternal$default$1() {
        return "upsert";
    }

    public void assertLatestCheckpointInfoMatched(HoodieTableMetaClient metaClient, String identifier, String expectBatchId) {
        metaClient.reloadActiveTimeline();
        Option lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter((HoodieTimeline)metaClient.getActiveTimeline().getCommitsTimeline(), (String)HoodieStreamingSink$.MODULE$.SINK_CHECKPOINT_KEY(), (String)identifier);
        Assertions.assertEquals((Object)lastCheckpoint.get(), (Object)expectBatchId);
    }

    public void structuredStreamingForTestClusteringRunner(String sourcePath, String destPath, HoodieTableType tableType, boolean isInlineClustering, boolean isAsyncClustering, String partitionOfRecords, Function1<String, BoxedUnit> checkClusteringResult) {
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        scala.collection.immutable.Map<String, String> hudiOptions = this.getClusteringOpts(tableType, Boolean.toString(isInlineClustering), Boolean.toString(isAsyncClustering), "2", 100);
        StreamingQuery streamingQuery = this.initWritingStreamingQuery(inputDF1.schema(), sourcePath, destPath, hudiOptions);
        Future f2 = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath);
            int currNumCommits = this.waitTillAtleastNCommits(this.protected$storage(this), destPath, 1, 120, 5);
            Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((HoodieStorage)this.protected$storage(this), (String)destPath, (String)"000"));
            inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath);
            currNumCommits = this.waitTillAtleastNCommits(this.protected$storage(this), destPath, currNumCommits + 1, 120, 5);
            this.protected$setmetaClient(this, HoodieTestUtils.createMetaClient((HoodieStorage)this.protected$storage(this), (String)destPath));
            checkClusteringResult.apply((Object)destPath);
            Assertions.assertEquals((int)3, (int)HoodieDataSourceHelpers.listCommitsSince((HoodieStorage)this.protected$storage(this), (String)destPath, (String)"000").size());
            Assertions.assertTrue((new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.getLatestFileGroupsFileId(partitionOfRecords))).size() > 0 ? 1 : 0) != 0);
            Dataset hoodieROViewDF2 = this.spark().read().format("org.apache.hudi").load(destPath);
            Assertions.assertEquals((long)200L, (long)hoodieROViewDF2.count());
            Row[] countsPerCommit = (Row[])hoodieROViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)2, (int)countsPerCommit.length);
            String commitInstantTime2 = this.latestInstant(this.protected$storage(this), destPath, "commit");
            Assertions.assertEquals((Object)commitInstantTime2, (Object)((Row)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])countsPerCommit)).maxBy((Function1 & Serializable & scala.Serializable)row -> (String)row.getAs(0), (Ordering)Ordering.String$.MODULE$)).get(0));
            streamingQuery.stop();
        }, ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)f2, Duration$.MODULE$.apply("120s"));
    }

    private String[] getLatestFileGroupsFileId(String partition) {
        this.getHoodieTableFileSystemView(this.metaClient, (HoodieTimeline)this.metaClient.getActiveTimeline(), HoodieTestTable.of((HoodieTableMetaClient)this.metaClient).listAllBaseFiles());
        return (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.tableView.getLatestFileSlices(partition).toArray())).map((Function1 & Serializable & scala.Serializable)slice -> ((FileSlice)slice).getFileGroupId().getFileId(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
    }

    private void waitTillHasCompletedReplaceInstant(String tablePath, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        boolean success = false;
        while (!success && currTime - beginTime < (long)timeoutMsecs) {
            try {
                try {
                    this.metaClient.reloadActiveTimeline();
                    int completeReplaceSize = this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
                    Predef$.MODULE$.println((Object)new StringBuilder(20).append("completeReplaceSize:").append(completeReplaceSize).toString());
                    if (completeReplaceSize <= 0) continue;
                    success = true;
                }
                catch (TableNotFoundException tableNotFoundException) {
                    this.log().info("Got table not found exception. Retrying");
                }
            }
            finally {
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (!success) {
            throw new IllegalStateException(new StringBuilder(59).append("Timed-out waiting for completing replace instant appear in ").append(tablePath).toString());
        }
    }

    private String latestInstant(HoodieStorage storage, String basePath, String instantAction) {
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)basePath);
        return ((HoodieInstant)metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{instantAction})).filterCompletedInstants().lastInstant().get()).requestedTime();
    }

    private void streamingWrite(StructType schema, String sourcePath, String destPath, scala.collection.immutable.Map<String, String> hudiOptions) {
        StreamingQuery query = this.spark().readStream().schema(schema).json(sourcePath).writeStream().format("org.apache.hudi").options(hudiOptions).trigger(Trigger.Once()).option("checkpointLocation", new StringBuilder(11).append(this.basePath).append("/checkpoint").toString()).outputMode(OutputMode.Append()).start(destPath);
        query.processAllAvailable();
        query.stop();
    }

    @Test
    public void testStructuredStreamingWithDisabledCompaction() {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        String sourcePath2 = (String)tuple22._1();
        String destPath2 = (String)tuple22._2();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(10)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        scala.collection.immutable.Map opts = this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)HoodieTableType.MERGE_ON_READ.name())).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.STREAMING_DISABLE_COMPACTION().key()), (Object)"true"));
        this.streamingWrite(inputDF1.schema(), sourcePath2, destPath2, (scala.collection.immutable.Map<String, String>)opts);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 24).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            String id = String.format("%03d", new Integer(i));
            List records = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.protected$dataGen(this).generateUpdates(id, Predef$.MODULE$.int2Integer(10)))).asScala()).toList();
            Dataset inputDF = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records, 2, ClassTag$.MODULE$.apply(String.class)));
            inputDF.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
            this.streamingWrite(inputDF.schema(), sourcePath2, destPath2, (scala.collection.immutable.Map<String, String>)opts);
        });
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((HoodieStorage)this.storage, (String)destPath2);
        Assertions.assertTrue((boolean)metaClient.getActiveTimeline().getCommitAndReplaceTimeline().empty());
        Assertions.assertEquals((int)25, (int)metaClient.getActiveTimeline().countInstants());
    }

    @ParameterizedTest
    @CsvSource(value={"COPY_ON_WRITE,EVENT_TIME_ORDERING", "MERGE_ON_READ,EVENT_TIME_ORDERING", "COPY_ON_WRITE,COMMIT_TIME_ORDERING", "MERGE_ON_READ,COMMIT_TIME_ORDERING", "COPY_ON_WRITE,CUSTOM", "MERGE_ON_READ,CUSTOM"})
    public void testStructuredStreamingWithMergeMode(String tableType, String mergeMode) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        String sourcePath2 = (String)tuple22._1();
        String destPath2 = (String)tuple22._2();
        List records1 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).asScala()).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        scala.collection.immutable.Map opts = this.commonOpts().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.OPERATION().key()), (Object)DataSourceWriteOptions$.MODULE$.UPSERT_OPERATION_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORD_MERGE_MODE().key()), (Object)mergeMode), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key()), (Object)"parquet"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"weight")})));
        String string = mergeMode;
        String string2 = "CUSTOM";
        if (!(string != null ? !string.equals(string2) : string2 != null)) {
            opts = opts.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORD_MERGE_STRATEGY_ID().key()), (Object)"aea2e14e-19a2-4e33-bc37-f8871d55bd5b"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORD_MERGE_IMPL_CLASSES().key()), (Object)HoodieSparkDeleteRecordMerger.class.getName())})));
        }
        this.streamingWrite(inputDF1.schema(), sourcePath2, destPath2, (scala.collection.immutable.Map<String, String>)opts);
        List records2 = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUniqueUpdates("001", Predef$.MODULE$.int2Integer(50)))).asScala()).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
        this.streamingWrite(inputDF2.schema(), sourcePath2, destPath2, (scala.collection.immutable.Map<String, String>)opts);
        String string3 = mergeMode;
        String string4 = "CUSTOM";
        if (!(string3 != null ? !string3.equals(string4) : string4 != null)) {
            Dataset expectedFinalRecords = inputDF1.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"partition_path"})).except(inputDF2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"partition_path"})));
            Dataset finalRecords = this.spark().read().format("hudi").option(DataSourceWriteOptions$.MODULE$.RECORD_MERGE_IMPL_CLASSES().key(), HoodieSparkDeleteRecordMerger.class.getName()).load(destPath2).select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"partition_path"}));
            Assertions.assertEquals((long)expectedFinalRecords.count(), (long)finalRecords.count());
            Assertions.assertEquals((long)0L, (long)expectedFinalRecords.except(finalRecords).count());
            return;
        }
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((String)destPath2);
        java.util.List instants = metaClient.getActiveTimeline().getCommitsTimeline().getInstants();
        Assertions.assertEquals((int)2, (int)instants.size());
        this.spark().read().format("hudi").load(destPath2).createOrReplaceTempView("finalRecords");
        Dataset updatedRecords = this.spark().sql(new StringBuilder(86).append("select _row_key, partition_path, weight from finalRecords ").append("where _hoodie_commit_time = ").append(((HoodieInstant)instants.get(1)).requestedTime()).toString());
        String string5 = mergeMode;
        String string6 = "COMMIT_TIME_ORDERING";
        if (!(string5 != null ? !string5.equals(string6) : string6 != null)) {
            Assertions.assertEquals((long)inputDF2.count(), (long)updatedRecords.count());
            Assertions.assertEquals((long)0L, (long)inputDF2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"partition_path", "weight"})).except(updatedRecords).count());
            return;
        }
        String string7 = mergeMode;
        String string8 = "EVENT_TIME_ORDERING";
        if (!(string7 != null ? !string7.equals(string8) : string8 != null)) {
            inputDF1.createOrReplaceTempView("input1");
            inputDF2.createOrReplaceTempView("input2");
            Dataset expectedUpdatedRecords = this.spark().sql("SELECT input2._row_key, input2.partition_path, input2.weight FROM input1 JOIN input2 ON input1._row_key = input2._row_key AND input1.partition_path = input2.partition_path WHERE input1.weight < input2.weight");
            Assertions.assertEquals((long)expectedUpdatedRecords.count(), (long)updatedRecords.count());
            Assertions.assertEquals((long)0L, (long)expectedUpdatedRecords.except(updatedRecords).count());
            return;
        }
    }

    private final void checkClusteringResult$1(String destPath) {
        this.waitTillHasCompletedReplaceInstant(destPath, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)1, (int)this.getLatestFileGroupsFileId("2016/03/15").length);
    }
}

