/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
public class TestS3EventsHoodieIncrSource
extends S3EventsHoodieIncrSourceHarness {
    @Override
    @BeforeEach
    public void setUp() throws IOException {
        super.setUp();
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
    }

    @Test
    public void testEmptyCheckpoint() throws IOException {
        String commitTimeForWrites;
        String commitTimeForReads = commitTimeForWrites = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForWrites);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 0L, (String)inserts.getKey());
    }

    @Test
    public void testOneFileInCommit() throws IOException {
        String commitTimeForWrites1 = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites1);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file3.json", (Object)200L, (Object)"1"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 100L, "1#path/to/file1.json");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file1.json"), 200L, "1#path/to/file2.json");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file2.json"), 200L, "1#path/to/file3.json");
    }

    @Test
    public void testTwoFilesAndContinueInSameCommit() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file3.json", (Object)200L, (Object)"1"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 250L, "1#path/to/file2.json");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file2.json"), 250L, "1#path/to/file3.json");
    }

    @ParameterizedTest
    @ValueSource(strings={".json", ".gz"})
    public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites);
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        if (!extension.endsWith("json")) {
            typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), extension);
        }
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file1%s", extension), (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file2%s", ".ignore"), (Object)800L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file3%s", extension), (Object)200L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file2%s", extension), (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file4%s", extension), (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file4%s", ".ignore"), (Object)200L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file5%s", extension), (Object)150L, (Object)"2"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1"), 100L, "1#path/to/file1" + extension, typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)("1#path/to/file1" + extension)), 100L, "1#path/to/file2" + extension, typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)("1#path/to/file2" + extension)), 1000L, "2#path/to/file5" + extension, typedProperties);
    }

    @Test
    public void testEmptyDataAfterFilter() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip3.json", (Object)200L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip5.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip4.json", (Object)150L, (Object)"2"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1"), 1000L, "2", typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file3.json"), 1000L, "2", typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"2#path/to/skip4.json"), 1000L, "2#path/to/skip4.json", typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"2#path/to/skip5.json"), 1000L, "2#path/to/skip5.json", typedProperties);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"2"), 1000L, "2", typedProperties);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFilterAnEntireCommit(boolean useSourceProfile) throws IOException {
        String commitTimeForWrites1 = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites1);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip2.json", (Object)200L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip3.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip4.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip5.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file5.json", (Object)150L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file4.json", (Object)150L, (Object)"2"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        S3EventsHoodieIncrSourceHarness.TestSourceProfile sourceProfile = new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, 0, 10L);
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        if (useSourceProfile) {
            Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)sourceProfile);
        } else {
            Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        }
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1"), 50L, "2#path/to/file4.json", typedProperties);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws IOException {
        String commitTimeForWrites1 = "2";
        String commitTimeForWrites2 = "3";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeS3MetadataRecords(commitTimeForReads);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites1);
        inserts = this.writeS3MetadataRecords(commitTimeForWrites2);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file3.json", (Object)200L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip1.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip2.json", (Object)150L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file_no_match1.json", (Object)150L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file5.json", (Object)150L, (Object)"3"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file4.json", (Object)150L, (Object)"3"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        S3EventsHoodieIncrSourceHarness.TestSourceProfile sourceProfile = new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, 0, 10L);
        if (useSourceProfile) {
            Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)sourceProfile);
        } else {
            Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        }
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties);
        this.schemaProvider = Option.empty();
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties);
    }

    @ParameterizedTest
    @CsvSource(value={"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"})
    public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException {
        this.writeS3MetadataRecords("1");
        this.writeS3MetadataRecords("2");
        this.writeS3MetadataRecords("3");
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file_no_match1.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip1.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip2.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file_no_match2.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file5.json", (Object)50L, (Object)"3"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file4.json", (Object)50L, (Object)"3"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file_no_match3.json", (Object)50L, (Object)"3"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs, (Option<String>)Option.of((Object)snapshotCheckPoint));
        Mockito.when((Object)this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), Mockito.anyInt())).thenReturn((Object)Option.empty());
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip");
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+");
        List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000000000L);
        int sourcePartitions = 2;
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50000L, exptected1, typedProperties);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relpath.prefix", "path/to/");
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "file[0-9]+");
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)exptected1), 10L, exptected2, typedProperties);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relpath.prefix", "path/to");
        typedProperties.remove((Object)"hoodie.streamer.source.cloud.data.select.relative.path.regex");
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50L, exptected3, typedProperties);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to");
        typedProperties.remove((Object)"hoodie.streamer.source.cloud.data.select.relpath.prefix");
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50L, exptected4, typedProperties);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class);
        ((CloudObjectsSelectorCommon)Mockito.verify((Object)this.mockCloudObjectsSelectorCommon, (VerificationMode)Mockito.atLeastOnce())).loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq((Object)this.schemaProvider), ((Integer)argumentCaptor.capture()).intValue());
        ((HoodieIngestionMetrics)Mockito.verify((Object)this.metrics, (VerificationMode)Mockito.atLeastOnce())).updateStreamerSourceParallelism(((Integer)argumentCaptorForMetrics.capture()).intValue());
        List<Integer> numPartitions = snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2") ? Arrays.asList(12, 3, sourcePartitions) : Arrays.asList(23, sourcePartitions);
        Assertions.assertEquals(numPartitions, (Object)argumentCaptor.getAllValues());
        Assertions.assertEquals(numPartitions, (Object)argumentCaptorForMetrics.getAllValues());
    }

    @Test
    public void testUnsupportedCheckpoint() {
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, this.jsc(), this.spark(), this.mockQueryRunner, new CloudDataFetcher(new TypedProperties(), this.jsc(), this.spark(), this.metrics, this.mockCloudObjectsSelectorCommon), (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        Exception exception = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> incrSource.translateCheckpoint(Option.of((Object)new StreamerCheckpointV2("1"))));
        Assertions.assertEquals((Object)"For S3EventsHoodieIncrSource, only StreamerCheckpointV1, i.e., requested time-based checkpoint, is supported. Checkpoint provided is: StreamerCheckpointV2{checkpointKey='1'}", (Object)exception.getMessage());
    }

    @Test
    public void testCreateSource() throws IOException {
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        Source s3Source = UtilHelpers.createSource((String)S3EventsHoodieIncrSource.class.getName(), (TypedProperties)typedProperties, (JavaSparkContext)this.jsc(), (SparkSession)this.spark(), (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        Assertions.assertEquals((Object)Source.SourceType.ROW, (Object)s3Source.getSourceType());
    }
}

