/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.Serializable;
import java.util.function.BiPredicate;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Mc\u0001B\n\u0015\u0001uAQ\u0001\n\u0001\u0005\u0002\u0015B\u0011\u0002\u000b\u0001A\u0002\u0003\u0007I\u0011A\u0015\t\u0013E\u0002\u0001\u0019!a\u0001\n\u0003\u0011\u0004\"C\u001e\u0001\u0001\u0004\u0005\t\u0015)\u0003+\u0011\u001da\u0004A1A\u0005\nuBa!\u0011\u0001!\u0002\u0013q\u0004b\u0002\"\u0001\u0005\u0004%\ta\u0011\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002#\t\u000fA\u0003!\u0019!C\u0001\u0007\"1\u0011\u000b\u0001Q\u0001\n\u0011CQA\u0015\u0001\u0005BMCQa\u0018\u0001\u0005BMCQ\u0001\u001a\u0001\u0005\u0002\u0015Dq!!\u0001\u0001\t\u0013\t\u0019\u0001C\u0004\u0002\u001c\u0001!I!!\b\t\u000f\u0005%\u0002\u0001\"\u0003\u0002,!a\u0011q\u0006\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u00022!a\u0011q\b\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u0002B\t!C+Z:u\u0013:\u001c'/Z7f]R\fGNU3bI^KG\u000f\u001b$vY2$\u0016M\u00197f'\u000e\fgN\u0003\u0002\u0016-\u0005Qa-\u001e8di&|g.\u00197\u000b\u0005]A\u0012\u0001\u00025vI&T!!\u0007\u000e\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0012aA8sO\u000e\u00011C\u0001\u0001\u001f!\ty\"%D\u0001!\u0015\t\tc#A\u0005uKN$X\u000f^5mg&\u00111\u0005\t\u0002\u001a\u0011>|G-[3Ta\u0006\u00148n\u00117jK:$H+Z:u\u0005\u0006\u001cX-\u0001\u0004=S:LGO\u0010\u000b\u0002MA\u0011q\u0005A\u0007\u0002)\u0005)1\u000f]1sWV\t!\u0006\u0005\u0002,_5\tAF\u0003\u0002.]\u0005\u00191/\u001d7\u000b\u0005!B\u0012B\u0001\u0019-\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0003%\u0019\b/\u0019:l?\u0012*\u0017\u000f\u0006\u00024sA\u0011AgN\u0007\u0002k)\ta'A\u0003tG\u0006d\u0017-\u0003\u00029k\t!QK\\5u\u0011\u001dQ4!!AA\u0002)\n1\u0001\u001f\u00132\u0003\u0019\u0019\b/\u0019:lA\u0005a\u0001/\u001a:CCR\u001c\u0007nU5{KV\ta\b\u0005\u00025\u007f%\u0011\u0001)\u000e\u0002\u0004\u0013:$\u0018!\u00049fe\n\u000bGo\u00195TSj,\u0007%A\bwKJLg-[2bi&|gnQ8m+\u0005!\u0005CA#M\u001d\t1%\n\u0005\u0002Hk5\t\u0001J\u0003\u0002J9\u00051AH]8pizJ!aS\u001b\u0002\rA\u0013X\rZ3g\u0013\tieJ\u0001\u0004TiJLgn\u001a\u0006\u0003\u0017V\n\u0001C^3sS\u001aL7-\u0019;j_:\u001cu\u000e\u001c\u0011\u0002-U\u0004H-\u0019;fIZ+'/\u001b4jG\u0006$\u0018n\u001c8WC2\fq#\u001e9eCR,GMV3sS\u001aL7-\u0019;j_:4\u0016\r\u001c\u0011\u0002\u000bM,G/\u00169\u0015\u0003MB#aC+\u0011\u0005YkV\"A,\u000b\u0005aK\u0016aA1qS*\u0011!lW\u0001\bUV\u0004\u0018\u000e^3s\u0015\ta&$A\u0003kk:LG/\u0003\u0002_/\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u0011Q,\u0017M\u001d#po:D#\u0001D1\u0011\u0005Y\u0013\u0017BA2X\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u0019uKN$h)Y5m\u000b\u0006\u0014H.\u001f$pe&s7M\u001d,jK^\fV/\u001a:z\r>\u0014hj\u001c8Fq&\u001cH/\u001b8h\r&dWm\u001d\u000b\u0003g\u0019DQaZ\u0007A\u0002!\f\u0011\u0002^1cY\u0016$\u0016\u0010]3\u0011\u0005%tW\"\u00016\u000b\u0005-d\u0017!B7pI\u0016d'BA7\u0017\u0003\u0019\u0019w.\\7p]&\u0011qN\u001b\u0002\u0010\u0011>|G-[3UC\ndW\rV=qK\"\"Q\"]={!\t\u0011x/D\u0001t\u0015\t!X/\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t1\u0018,\u0001\u0004qCJ\fWn]\u0005\u0003qN\u0014!\"\u00128v[N{WO]2f\u0003\u00151\u0018\r\\;fG\u0005A\u0007FA\u0007}!\tih0D\u0001v\u0013\tyXOA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fQD];o\u0013:\u001c'/Z7f]R\fG.U;fef\fe\u000eZ\"p[B\f'/\u001a\u000b\ng\u0005\u0015\u0011\u0011BA\u0007\u0003#Aa!a\u0002\u000f\u0001\u0004!\u0015aB:uCJ$Hk\u001d\u0005\u0007\u0003\u0017q\u0001\u0019\u0001#\u0002\u000b\u0015tG\rV:\t\r\u0005=a\u00021\u0001?\u0003!\u0011\u0017\r^2i\u001dVl\u0007bBA\n\u001d\u0001\u0007\u0011QC\u0001\u0016M\u0006dGNQ1dW\u001a+H\u000e\u001c+bE2,7kY1o!\r!\u0014qC\u0005\u0004\u00033)$a\u0002\"p_2,\u0017M\\\u0001+g\"|W\u000f\u001c3UQJ|wo\u00159be.,\u0005pY3qi&|g.\u00134GC2d'-Y2l\u0013N4\u0015\r\\:f)\r\u0019\u0014q\u0004\u0005\b\u0003Cy\u0001\u0019AA\u0012\u0003\t1g\u000e\u0005\u00035\u0003K\u0019\u0014bAA\u0014k\tIa)\u001e8di&|g\u000eM\u0001\u001dg\"|W\u000f\u001c3UQJ|w/\u00134GC2d'-Y2l\u0013N4\u0015\r\\:f)\r\u0019\u0014Q\u0006\u0005\b\u0003C\u0001\u0002\u0019AA\u0012\u0003E\u0001(o\u001c;fGR,G\r\n3bi\u0006<UM\u001c\u000b\u0005\u0003g\ti\u0004\u0005\u0003\u00026\u0005eRBAA\u001c\u0015\t\tC.\u0003\u0003\u0002<\u0005]\"a\u0006%p_\u0012LW\rV3ti\u0012\u000bG/Y$f]\u0016\u0014\u0018\r^8s\u0011\u001dQ\u0014#!AA\u0002\u0019\n!\u0003\u001d:pi\u0016\u001cG/\u001a3%E\u0006\u001cX\rU1uQR!\u00111IA)!\u0011\t)%a\u0014\u000e\u0005\u0005\u001d#\u0002BA%\u0003\u0017\nA\u0001\\1oO*\u0011\u0011QJ\u0001\u0005U\u00064\u0018-C\u0002N\u0003\u000fBqA\u000f\n\u0002\u0002\u0003\u0007a\u0005")
public class TestIncrementalReadWithFullTableScan
extends HoodieSparkClientTestBase {
    private SparkSession spark;
    private final int perBatchSize;
    private final String verificationCol;
    private final String updatedVerificationVal;

    public /* synthetic */ HoodieTestDataGenerator protected$dataGen(TestIncrementalReadWithFullTableScan x$1) {
        return x$1.dataGen;
    }

    public /* synthetic */ String protected$basePath(TestIncrementalReadWithFullTableScan x$1) {
        return x$1.basePath;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    private int perBatchSize() {
        return this.perBatchSize;
    }

    public String verificationCol() {
        return this.verificationCol;
    }

    public String updatedVerificationVal() {
        return this.updatedVerificationVal;
    }

    @BeforeEach
    public void setUp() {
        this.setTableName("hoodie_test");
        this.initPath();
        this.initSparkContexts();
        this.spark_$eq(this.sqlContext.sparkSession());
        this.initTestDataGenerator();
        this.initHoodieStorage();
    }

    @AfterEach
    public void tearDown() {
        this.spark_$eq(null);
        this.cleanupResources();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testFailEarlyForIncrViewQueryForNonExistingFiles(HoodieTableType tableType) {
        scala.collection.immutable.Map commonOpts = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)"partition"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.TBL_NAME.key()), (Object)"hoodie_test"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key()), (Object)"1")}));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            List records = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(RawTripTestPayload.recordsToStrings((java.util.List)this.protected$dataGen(this).generateInserts(new StringOps(Predef$.MODULE$.augmentString("%05d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)i)})), Predef$.MODULE$.int2Integer(this.perBatchSize())))).asScala()).toList();
            Dataset inputDF = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records, 2, ClassTag$.MODULE$.apply(String.class)));
            inputDF.write().format("org.apache.hudi").options((Map)commonOpts).option(DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key(), tableType.name()).option("hoodie.clean.commits.retained", "3").option("hoodie.keep.min.commits", "4").option("hoodie.keep.max.commits", "7").option(DataSourceWriteOptions$.MODULE$.OPERATION().key(), DataSourceWriteOptions$.MODULE$.INSERT_OPERATION_OPT_VAL()).mode(SaveMode.Append).save(this.protected$basePath(this));
        });
        HoodieTableMetaClient hoodieMetaClient = this.createMetaClient(this.spark(), this.basePath);
        HoodieTimeline completedCommits = hoodieMetaClient.getCommitsTimeline().filterCompletedInstants();
        Object[] archivedInstants = hoodieMetaClient.getArchivedTimeline().filterCompletedInstants().getInstantsAsStream().distinct().toArray();
        int nCompletedCommits = completedCommits.getInstants().size();
        int nArchivedInstants = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(archivedInstants)).size();
        Assertions.assertTrue((nCompletedCommits >= 3 ? 1 : 0) != 0);
        Assertions.assertTrue((nArchivedInstants >= 3 ? 1 : 0) != 0);
        String startUnarchivedCompletionTs = ((HoodieInstant)completedCommits.nthInstant(1).get()).getCompletionTime();
        String endUnarchivedCompletionTs = ((HoodieInstant)completedCommits.nthInstant(1).get()).getCompletionTime();
        String startArchivedCompletionTs = ((HoodieInstant)archivedInstants[1]).getCompletionTime();
        String endArchivedCompletionTs = ((HoodieInstant)archivedInstants[1]).getCompletionTime();
        String startOutOfRangeCommitTs = hoodieMetaClient.createNewInstantTime();
        String endOutOfRangeCommitTs = hoodieMetaClient.createNewInstantTime();
        Assertions.assertTrue((boolean)InstantComparison.compareTimestamps((String)startOutOfRangeCommitTs, (BiPredicate)InstantComparison.GREATER_THAN, (String)((HoodieInstant)completedCommits.lastInstant().get()).requestedTime()));
        Assertions.assertTrue((boolean)InstantComparison.compareTimestamps((String)endOutOfRangeCommitTs, (BiPredicate)InstantComparison.GREATER_THAN, (String)((HoodieInstant)completedCommits.lastInstant().get()).requestedTime()));
        this.runIncrementalQueryAndCompare(startArchivedCompletionTs, endArchivedCompletionTs, 1, true);
        this.shouldThrowIfFallbackIsFalse((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.runIncrementalQueryAndCompare(startArchivedCompletionTs, endUnarchivedCompletionTs, nArchivedInstants + 1, false));
        this.runIncrementalQueryAndCompare(startArchivedCompletionTs, endUnarchivedCompletionTs, nArchivedInstants + 1, true);
        this.shouldThrowSparkExceptionIfFallbackIsFalse((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.runIncrementalQueryAndCompare(startUnarchivedCompletionTs, endUnarchivedCompletionTs, 1, false));
        this.runIncrementalQueryAndCompare(startUnarchivedCompletionTs, endUnarchivedCompletionTs, 1, true);
        this.runIncrementalQueryAndCompare(startUnarchivedCompletionTs, endOutOfRangeCommitTs, nCompletedCommits - 1, true);
        this.runIncrementalQueryAndCompare(startOutOfRangeCommitTs, endOutOfRangeCommitTs, 0, false);
        this.runIncrementalQueryAndCompare(startOutOfRangeCommitTs, endOutOfRangeCommitTs, 0, true);
        this.runIncrementalQueryAndCompare(startUnarchivedCompletionTs, HoodieInstantTimeGenerator.instantTimeMinusMillis((String)startUnarchivedCompletionTs, (long)1L), 0, false);
        this.runIncrementalQueryAndCompare(startUnarchivedCompletionTs, HoodieInstantTimeGenerator.instantTimeMinusMillis((String)startUnarchivedCompletionTs, (long)1L), 0, true);
        Object[] reversedCommits = completedCommits.getReverseOrderedInstants().toArray();
        String startUncleanedCompletionTs = ((HoodieInstant)reversedCommits[0]).getCompletionTime();
        String endUncleanedCompletionTs = ((HoodieInstant)reversedCommits[0]).getCompletionTime();
        this.runIncrementalQueryAndCompare(startUncleanedCompletionTs, endUncleanedCompletionTs, 1, true);
        this.runIncrementalQueryAndCompare(startUncleanedCompletionTs, endUncleanedCompletionTs, 1, false);
    }

    private void runIncrementalQueryAndCompare(String startTs, String endTs, int batchNum, boolean fallBackFullTableScan) {
        Dataset hoodieIncViewDF = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.START_COMMIT().key(), startTs).option(DataSourceReadOptions$.MODULE$.END_COMMIT().key(), endTs).option(DataSourceReadOptions$.MODULE$.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), fallBackFullTableScan).load(this.basePath);
        Assertions.assertEquals((long)(this.perBatchSize() * batchNum), (long)hoodieIncViewDF.count());
    }

    private void shouldThrowSparkExceptionIfFallbackIsFalse(Function0<BoxedUnit> fn) {
        String msg = "Should fail with Path does not exist";
        SparkException exp = (SparkException)Assertions.assertThrows(SparkException.class, (Executable)new Executable(null, fn){
            private final Function0 fn$1;

            public void execute() {
                this.fn$1.apply$mcV$sp();
            }
            {
                this.fn$1 = fn$1;
            }
        }, (String)msg);
        Assertions.assertTrue((boolean)exp.getMessage().contains("FileNotFoundException"));
    }

    private void shouldThrowIfFallbackIsFalse(Function0<BoxedUnit> fn) {
        String msg = "Should fail with Path does not exist";
        SparkException exp = (SparkException)Assertions.assertThrows(SparkException.class, (Executable)new Executable(null, fn){
            private final Function0 fn$2;

            public void execute() {
                this.fn$2.apply$mcV$sp();
            }
            {
                this.fn$2 = fn$2;
            }
        }, (String)msg);
        Assertions.assertTrue((boolean)exp.getMessage().contains("FileNotFoundException"), (String)new StringBuilder(67).append("Expected to fail with 'FileNotFoundException' but the message was: ").append(exp.getMessage()).toString());
    }

    public TestIncrementalReadWithFullTableScan() {
        this.perBatchSize = 100;
        this.verificationCol = "driver";
        this.updatedVerificationVal = "driver_update";
    }
}

