/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.BaseHoodieTableFileIndex;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class TestHoodieIncrSource
extends SparkClientFunctionalTestHarness {
    private HoodieTestDataGenerator dataGen;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
    private final Option<SourceProfileSupplier> sourceProfile = Option.of((Object)Mockito.mock(SourceProfileSupplier.class));
    private final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics)Mockito.mock(HoodieIngestionMetrics.class);

    public SparkConf conf() {
        return this.conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.dataGen = new HoodieTestDataGenerator();
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, Properties props) throws IOException {
        return this.getHoodieMetaClient(storageConf, basePath, props, this.tableType);
    }

    @Test
    public void testCreateSource() {
        TypedProperties properties = new TypedProperties();
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT.name());
        HoodieIncrSource incrSource = new HoodieIncrSource(properties, this.jsc(), this.spark(), this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), this.sourceProfile));
        org.junit.jupiter.api.Assertions.assertEquals((Object)Source.SourceType.ROW, (Object)incrSource.getSourceType());
        incrSource = new HoodieIncrSource(properties, this.jsc(), this.spark(), (StreamContext)new DefaultStreamContext((SchemaProvider)new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), this.sourceProfile));
        org.junit.jupiter.api.Assertions.assertEquals((Object)Source.SourceType.ROW, (Object)incrSource.getSourceType());
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForHoodieIncrSource"})
    public void testHoodieIncrSource(HoodieTableType tableType, boolean useSourceProfile, HoodieTableVersion sourceTableVersion) throws IOException {
        this.tableType = tableType;
        Properties properties = this.getPropertiesForKeyGen(true);
        properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), Integer.toString(sourceTableVersion.versionCode()));
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath(), properties);
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withWriteTableVersion(sourceTableVersion.versionCode()).withAutoUpgradeVersion(false).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        if (useSourceProfile) {
            Mockito.when((Object)((SourceProfileSupplier)this.sourceProfile.get()).getSourceProfile()).thenReturn((Object)new TestSourceProfile(Long.MAX_VALUE, 4, 500));
        } else {
            Mockito.when((Object)((SourceProfileSupplier)this.sourceProfile.get()).getSourceProfile()).thenReturn(null);
        }
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            WriteResult insert1 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 98);
            WriteResult insert2 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 106);
            WriteResult insert3 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 114);
            WriteResult insert4 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 122);
            WriteResult insert5 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 130);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.empty(), 570, insert5.getInstant(), sourceTableVersion);
            StreamerCheckpointV2 instant5CheckpointV2 = new StreamerCheckpointV2(insert5.getCompletionTime());
            StreamerCheckpointV1 instant5CheckpointV1 = new StreamerCheckpointV1(insert5.getInstantTime());
            boolean sourceTableVersion8OrAbove = sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.empty(), 570, (Checkpoint)(sourceTableVersion8OrAbove ? instant5CheckpointV2 : instant5CheckpointV1));
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.empty(), 570, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.of((Object)insert1.getInstant()), 472, insert5.getInstant(), sourceTableVersion);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV1(insert1.getInstant().requestedTime())), 472, (Checkpoint)(sourceTableVersion8OrAbove ? instant5CheckpointV2 : instant5CheckpointV1));
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV2(insert1.getInstant().getCompletionTime())), 472, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV1(insert1.getInstant().requestedTime())), 472, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.of((Object)insert4.getInstant()), 130, insert5.getInstant(), sourceTableVersion);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV1(insert4.getInstant().requestedTime())), 130, (Checkpoint)(sourceTableVersion8OrAbove ? instant5CheckpointV2 : instant5CheckpointV1));
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV2(insert4.getInstant().getCompletionTime())), 130, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV1(insert4.getInstant().requestedTime())), 130, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.empty(), 130, insert5.getInstant(), sourceTableVersion);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.empty(), 130, (Checkpoint)(sourceTableVersion8OrAbove ? instant5CheckpointV2 : instant5CheckpointV1));
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.empty(), 130, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.of((Object)insert5.getInstant()), 0, insert5.getInstant(), sourceTableVersion);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV1), 0, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV2), 0, (Checkpoint)instant5CheckpointV1);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV1), 0, (Checkpoint)instant5CheckpointV1);
            WriteResult insert6 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime(), 168);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.of((Object)insert5.getInstant()), 168, insert6.getInstant(), sourceTableVersion);
            StreamerCheckpointV2 instant6CheckpointV2 = new StreamerCheckpointV2(insert6.getCompletionTime());
            StreamerCheckpointV1 instant6CheckpointV1 = new StreamerCheckpointV1(insert6.getInstantTime());
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.EIGHT, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV1), 168, (Checkpoint)(sourceTableVersion8OrAbove ? instant6CheckpointV2 : instant6CheckpointV1));
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV2), 168, (Checkpoint)instant6CheckpointV1);
            this.readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, HoodieTableVersion.SIX, (Option<Checkpoint>)Option.of((Object)instant5CheckpointV1), 168, (Checkpoint)instant6CheckpointV1);
            if (useSourceProfile) {
                // empty if block
            }
        }
    }

    private static Stream<Arguments> getArgumentsForHoodieIncrSource() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, true, HoodieTableVersion.EIGHT}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, true, HoodieTableVersion.EIGHT}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, false, HoodieTableVersion.EIGHT}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, false, HoodieTableVersion.EIGHT}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, true, HoodieTableVersion.SIX}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, true, HoodieTableVersion.SIX}));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            ArrayList<WriteResult> inserts = new ArrayList<WriteResult>();
            for (int i = 0; i < 6; ++i) {
                inserts.add(this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime()));
            }
            HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
            HoodieInstant instant4 = (HoodieInstant)activeTimeline.filter(instant -> instant.requestedTime().equals(((WriteResult)inserts.get(4)).getInstantTime())).firstInstant().get();
            HoodieCommitMetadata instant4CommitData = this.metaClient.reloadActiveTimeline().readCommitMetadata(instant4);
            activeTimeline.revertToInflight(instant4);
            this.metaClient.reloadActiveTimeline();
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.empty(), 500, ((WriteResult)inserts.get(5)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.of((Object)((WriteResult)inserts.get(0)).getInstant()), 400, ((WriteResult)inserts.get(5)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.of((Object)((WriteResult)inserts.get(2)).getInstant()), 200, ((WriteResult)inserts.get(5)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.empty(), 100, ((WriteResult)inserts.get(5)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.of((Object)((WriteResult)inserts.get(5)).getInstant()), 0, ((WriteResult)inserts.get(5)).getInstant());
            activeTimeline.reload().saveAsComplete(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), ((WriteResult)inserts.get(4)).getInstantTime()), Option.of((Object)instant4CommitData));
            instant4 = (HoodieInstant)activeTimeline.reload().filter(instant -> instant.requestedTime().equals(((WriteResult)inserts.get(4)).getInstantTime())).firstInstant().get();
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.of((Object)((WriteResult)inserts.get(3)).getInstant()), 200, instant4);
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            ArrayList<WriteResult> dataBatches = new ArrayList<WriteResult>();
            for (int i = 0; i < 6; ++i) {
                WriteOperationType opType = i < 4 ? WriteOperationType.BULK_INSERT : WriteOperationType.UPSERT;
                List<HoodieRecord> recordsForUpdate = i < 4 ? null : ((WriteResult)dataBatches.get(3)).getRecords();
                dataBatches.add(this.writeRecords(writeClient, opType, recordsForUpdate, writeClient.createNewInstantTime()));
                if (tableType == HoodieTableType.COPY_ON_WRITE) {
                    if (i != 2) continue;
                    writeClient.scheduleClustering(Option.empty());
                    continue;
                }
                if (tableType != HoodieTableType.MERGE_ON_READ) continue;
                if (i == 4) {
                    writeClient.scheduleCompaction(Option.empty());
                }
                if (i != 5) continue;
                writeClient.scheduleClustering(Option.empty());
            }
            dataBatches.add(this.writeRecords(writeClient, WriteOperationType.BULK_INSERT, null, writeClient.createNewInstantTime()));
            String latestCommitTimestamp = ((WriteResult)dataBatches.get(dataBatches.size() - 1)).getInstantTime();
            Option clusteringInstant = this.metaClient.getActiveTimeline().filterPendingClusteringTimeline().filter(instant -> ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)this.metaClient, (HoodieInstant)instant).isPresent()).firstInstant();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)clusteringInstant.isPresent());
            org.junit.jupiter.api.Assertions.assertTrue((((HoodieInstant)clusteringInstant.get()).requestedTime().compareTo(latestCommitTimestamp) < 0 ? 1 : 0) != 0);
            if (tableType == HoodieTableType.MERGE_ON_READ) {
                Option compactionInstant = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
                org.junit.jupiter.api.Assertions.assertTrue((boolean)compactionInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertTrue((((HoodieInstant)compactionInstant.get()).requestedTime().compareTo(latestCommitTimestamp) < 0 ? 1 : 0) != 0);
            }
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.empty(), 100, ((WriteResult)dataBatches.get(0)).getInstant(), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), new TypedProperties(), HoodieTableVersion.EIGHT);
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.empty(), 500, ((WriteResult)dataBatches.get(6)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.of((Object)((WriteResult)dataBatches.get(2)).getInstant()), 200, ((WriteResult)dataBatches.get(6)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.empty(), 100, ((WriteResult)dataBatches.get(6)).getInstant());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, (Option<HoodieInstant>)Option.of((Object)((WriteResult)dataBatches.get(6)).getInstant()), 0, ((WriteResult)dataBatches.get(6)).getInstant());
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType tableType) throws IOException {
        this.tableType = tableType;
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("_hoodie_commit_time").build()).build();
        TypedProperties extraProps = new TypedProperties();
        extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS.key(), "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            WriteResult inserts = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime());
            WriteResult inserts2 = this.writeRecords(writeClient, WriteOperationType.INSERT, null, writeClient.createNewInstantTime());
            this.readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<HoodieInstant>)Option.empty(), 100, inserts.getInstant(), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, HoodieTableVersion.EIGHT);
        }
    }

    @Test
    public void testPartitionPruningInHoodieIncrSource() throws IOException {
        this.tableType = HoodieTableType.MERGE_ON_READ;
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build()).build();
        ArrayList<WriteResult> inserts = new ArrayList<WriteResult>();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            for (int i = 0; i < 3; ++i) {
                inserts.add(this.writeRecordsForPartition(writeClient, WriteOperationType.BULK_INSERT, writeClient.createNewInstantTime(), HoodieTestUtils.DEFAULT_PARTITION_PATHS[i]));
            }
            TypedProperties extraProps = new TypedProperties();
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(1));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.empty(), 100, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(0)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)1));
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(101));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.empty(), 200, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(1)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)3));
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(10001));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.empty(), 300, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(2)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)3));
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(101));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV2(((WriteResult)inserts.get(0)).getCompletionTime())), 200, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(2)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)2));
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(101));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV2(((WriteResult)inserts.get(1)).getCompletionTime())), 100, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(2)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)1));
            extraProps.setProperty("test.snapshot.load.max.row.count", String.valueOf(101));
            this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<Checkpoint>)Option.of((Object)new StreamerCheckpointV2(((WriteResult)inserts.get(2)).getCompletionTime())), 0, (Checkpoint)new StreamerCheckpointV2(((WriteResult)inserts.get(2)).getCompletionTime()), (Option<String>)Option.of((Object)TestSnapshotQuerySplitterImpl.class.getName()), extraProps, (Option<Integer>)Option.ofNullable((Object)0));
        }
    }

    @Test
    void testFileIndexLogicalPlanSize() throws Exception {
        this.tableType = HoodieTableType.MERGE_ON_READ;
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        HoodieWriteConfig writeConfig = this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
        int numFileSlices = 20;
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            for (int i = 0; i < numFileSlices; ++i) {
                this.writeRecordsForPartition(writeClient, WriteOperationType.BULK_INSERT, "100" + i, String.format("2016/03/%s", i));
            }
        }
        TestHoodieIncrSource.getArgsForLogicalPlanSizeValidation().forEach(argumentsStream -> {
            Object[] arguments = argumentsStream.get();
            int fileSlicesCachedInMemory = (Integer)arguments[0];
            long spillableMemoryBytes = (Long)arguments[1];
            boolean useSpillableMap = (Boolean)arguments[2];
            Dataset dataset = this.spark().read().option(HoodieCommonConfig.HOODIE_FILE_INDEX_USE_SPILLABLE_MAP.key(), useSpillableMap).option(HoodieCommonConfig.HOODIE_FILE_INDEX_SPILLABLE_MEMORY.key(), spillableMemoryBytes).format("hudi").load(this.basePath());
            dataset.persist(StorageLevel.MEMORY_AND_DISK_SER());
            dataset.count();
            List logicalPlanChildren = JavaConverters.seqAsJavaList((Seq)dataset.logicalPlan().children().toSeq());
            BaseHoodieTableFileIndex hoodieTableFileIndex = (BaseHoodieTableFileIndex)((HadoopFsRelation)((LogicalRelation)logicalPlanChildren.get(0)).relation()).location();
            if (useSpillableMap) {
                ExternalSpillableMap<BaseHoodieTableFileIndex.PartitionPath, List<FileSlice>> cachedAllInputFileSlices = TestHoodieIncrSource.getSpillableMap(hoodieTableFileIndex);
                org.junit.jupiter.api.Assertions.assertEquals((int)fileSlicesCachedInMemory, (int)cachedAllInputFileSlices.getInMemoryMapNumEntries());
                org.junit.jupiter.api.Assertions.assertEquals((int)(numFileSlices - fileSlicesCachedInMemory), (int)cachedAllInputFileSlices.getDiskBasedMapNumEntries());
                org.junit.jupiter.api.Assertions.assertTrue((cachedAllInputFileSlices.getCurrentInMemoryMapSize() < 2L * spillableMemoryBytes ? 1 : 0) != 0, (String)("In-memory map size is greater than expected " + cachedAllInputFileSlices.getCurrentInMemoryMapSize()));
            } else {
                HashMap<BaseHoodieTableFileIndex.PartitionPath, List<FileSlice>> cachedAllInputFileSlices = TestHoodieIncrSource.getHashMap(hoodieTableFileIndex);
                org.junit.jupiter.api.Assertions.assertEquals((int)fileSlicesCachedInMemory, (int)cachedAllInputFileSlices.size());
                org.junit.jupiter.api.Assertions.assertTrue((ObjectSizeCalculator.getObjectSize(cachedAllInputFileSlices) > spillableMemoryBytes ? 1 : 0) != 0);
            }
            dataset.unpersist();
        });
    }

    private static ExternalSpillableMap<BaseHoodieTableFileIndex.PartitionPath, List<FileSlice>> getSpillableMap(BaseHoodieTableFileIndex hoodieTableFileIndex) {
        try {
            Field cachedAllInputFileSlicesField = null;
            cachedAllInputFileSlicesField = BaseHoodieTableFileIndex.class.getDeclaredField("cachedAllInputFileSlices");
            cachedAllInputFileSlicesField.setAccessible(true);
            return (ExternalSpillableMap)cachedAllInputFileSlicesField.get(hoodieTableFileIndex);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new HoodieException("field not found in BaseHoodieTableFileIndex", (Throwable)e);
        }
    }

    private static HashMap<BaseHoodieTableFileIndex.PartitionPath, List<FileSlice>> getHashMap(BaseHoodieTableFileIndex hoodieTableFileIndex) {
        try {
            Field cachedAllInputFileSlicesField = null;
            cachedAllInputFileSlicesField = BaseHoodieTableFileIndex.class.getDeclaredField("cachedAllInputFileSlices");
            cachedAllInputFileSlicesField.setAccessible(true);
            return (HashMap)cachedAllInputFileSlicesField.get(hoodieTableFileIndex);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new HoodieException("field not found in BaseHoodieTableFileIndex", (Throwable)e);
        }
    }

    private void readAndAssertCheckpointTranslation(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, HoodieTableVersion targetTableVersion, Option<Checkpoint> checkpointToPull, int expectedCount, Checkpoint expectedCheckpoint) {
        TypedProperties properties = new TypedProperties();
        properties.put((Object)HoodieWriteConfig.WRITE_TABLE_VERSION.key(), (Object)String.valueOf(targetTableVersion.versionCode()));
        this.readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, (Option<String>)Option.empty(), properties, (Option<Integer>)Option.empty());
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<Checkpoint> checkpointToPull, int expectedCount, Checkpoint expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt, TypedProperties extraProps, Option<Integer> expectedRDDPartitions) {
        TypedProperties properties = new TypedProperties();
        if (!ConfigUtils.containsConfigProperty((TypedProperties)extraProps, (ConfigProperty)HoodieWriteConfig.WRITE_TABLE_VERSION)) {
            properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), String.valueOf(HoodieTableVersion.current().versionCode()));
        }
        properties.setProperty("hoodie.streamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.putAll((Map)extraProps);
        snapshotCheckPointImplClassOpt.map(className -> properties.setProperty("hoodie.deltastreamer.snapshotload.query.splitter.class.name", className));
        HoodieIncrSource incrSource = new HoodieIncrSource(properties, this.jsc(), this.spark(), this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), this.sourceProfile));
        Pair batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500L);
        org.junit.jupiter.api.Assertions.assertNotNull((Object)batchCheckPoint.getValue());
        if (expectedCount == 0) {
            org.junit.jupiter.api.Assertions.assertFalse((boolean)((Option)batchCheckPoint.getKey()).isPresent());
        } else {
            org.junit.jupiter.api.Assertions.assertEquals((long)expectedCount, (long)((Dataset)((Option)batchCheckPoint.getKey()).get()).count());
            expectedRDDPartitions.ifPresent(rddPartitions -> org.junit.jupiter.api.Assertions.assertEquals((Integer)rddPartitions, (int)((Dataset)((Option)batchCheckPoint.getKey()).get()).rdd().getNumPartitions()));
        }
        org.junit.jupiter.api.Assertions.assertEquals((Object)expectedCheckpoint, (Object)batchCheckPoint.getRight());
    }

    private void readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<HoodieInstant> checkpointToPullInstant, int expectedCount, HoodieInstant expectedCheckpointInstant, Option<String> snapshotCheckPointImplClassOpt, TypedProperties extraProps, HoodieTableVersion sourceTableVersion) {
        Option checkpointToPull = sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? checkpointToPullInstant.map(instant -> new StreamerCheckpointV2(instant.getCompletionTime())) : checkpointToPullInstant.map(instant -> new StreamerCheckpointV1(instant.requestedTime()));
        StreamerCheckpointV2 expectedCheckpoint = sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? new StreamerCheckpointV2(expectedCheckpointInstant.getCompletionTime()) : new StreamerCheckpointV1(expectedCheckpointInstant.requestedTime());
        this.readAndAssert(missingCheckpointStrategy, (Option<Checkpoint>)checkpointToPull, expectedCount, (Checkpoint)expectedCheckpoint, snapshotCheckPointImplClassOpt, extraProps, (Option<Integer>)Option.empty());
    }

    private void readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<HoodieInstant> checkpointToPull, int expectedCount, HoodieInstant expectedCheckpoint) {
        this.readAndAssertWithLatestTableVersion(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, (Option<String>)Option.empty(), new TypedProperties(), HoodieTableVersion.EIGHT);
    }

    private void readAndAssertWithLatestTableVersion(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<HoodieInstant> checkpointToPull, int expectedCount, HoodieInstant expectedCheckpoint, HoodieTableVersion sourceTableVersion) {
        this.readAndAssertWithLatestTableVersion(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, (Option<String>)Option.empty(), new TypedProperties(), sourceTableVersion);
    }

    private WriteResult writeRecords(SparkRDDWriteClient writeClient, WriteOperationType writeOperationType, List<HoodieRecord> insertRecords, String commit) throws IOException {
        return this.writeRecords(writeClient, writeOperationType, insertRecords, commit, 100);
    }

    private WriteResult writeRecords(SparkRDDWriteClient writeClient, WriteOperationType writeOperationType, List<HoodieRecord> insertRecords, String commit, int numRecords) throws IOException {
        writeClient.startCommitWithTime(commit);
        List records = writeOperationType == WriteOperationType.UPSERT ? this.dataGen.generateUpdates(commit, insertRecords) : this.dataGen.generateInserts(commit, Integer.valueOf(numRecords));
        JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT ? writeClient.bulkInsert(this.jsc().parallelize(records, 1), commit) : writeClient.upsert(this.jsc().parallelize(records, 1), commit);
        List statuses = result.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        this.metaClient.reloadActiveTimeline();
        return new WriteResult((HoodieInstant)this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get(), records);
    }

    private WriteResult writeRecordsForPartition(SparkRDDWriteClient writeClient, WriteOperationType writeOperationType, String commit, String partitionPath) {
        writeClient.startCommitWithTime(commit);
        List records = this.dataGen.generateInsertsForPartition(commit, Integer.valueOf(100), partitionPath);
        JavaRDD result = writeOperationType == WriteOperationType.BULK_INSERT ? writeClient.bulkInsert(this.jsc().parallelize(records, 1), commit) : writeClient.upsert(this.jsc().parallelize(records, 1), commit);
        List statuses = result.collect();
        Assertions.assertNoWriteErrors((List)statuses);
        this.metaClient.reloadActiveTimeline();
        return new WriteResult((HoodieInstant)this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get(), records);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(metaClient.getTableConfig().getTableName());
    }

    private static Stream<Arguments> getArgsForLogicalPlanSizeValidation() {
        return Stream.of(Arguments.of((Object[])new Object[]{1, 3072L, true}), Arguments.of((Object[])new Object[]{20, 3072L, false}), Arguments.of((Object[])new Object[]{20, 0x1400000L, true}));
    }

    static class WriteResult {
        private HoodieInstant instant;
        private List<HoodieRecord> records;

        WriteResult(HoodieInstant instant, List<HoodieRecord> records) {
            this.instant = instant;
            this.records = records;
        }

        public HoodieInstant getInstant() {
            return this.instant;
        }

        public String getInstantTime() {
            return this.instant.requestedTime();
        }

        public String getCompletionTime() {
            return this.instant.getCompletionTime();
        }

        public List<HoodieRecord> getRecords() {
            return this.records;
        }
    }

    static class TestSourceProfile
    implements SourceProfile<Integer> {
        private final long maxSourceBytes;
        private final int sourcePartitions;
        private final int numInstantsPerFetch;

        public TestSourceProfile(long maxSourceBytes, int sourcePartitions, int numInstantsPerFetch) {
            this.maxSourceBytes = maxSourceBytes;
            this.sourcePartitions = sourcePartitions;
            this.numInstantsPerFetch = numInstantsPerFetch;
        }

        public long getMaxSourceBytes() {
            return this.maxSourceBytes;
        }

        public int getSourcePartitions() {
            return this.sourcePartitions;
        }

        public Integer getSourceSpecificContext() {
            return this.numInstantsPerFetch;
        }
    }

    private static class DummySchemaProvider
    extends SchemaProvider {
        private final Schema schema;

        public DummySchemaProvider(Schema schema) {
            super(new TypedProperties());
            this.schema = schema;
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }
}

