/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestGcsEventsHoodieIncrSource
extends SparkClientFunctionalTestHarness {
    private static final Schema GCS_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource(TestGcsEventsHoodieIncrSource.class, (String)"/streamer-config/gcs-metadata.avsc", (boolean)true);
    private static final String IGNORE_FILE_EXTENSION = ".ignore";
    private ObjectMapper mapper = new ObjectMapper();
    @TempDir
    protected Path tempDir;
    @Mock
    QueryRunner queryRunner;
    @Mock
    QueryInfo queryInfo;
    @Mock
    CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
    @Mock
    HoodieIngestionMetrics metrics;
    @Mock
    SourceProfileSupplier sourceProfileSupplier;
    protected Option<SchemaProvider> schemaProvider;
    private HoodieTableMetaClient metaClient;
    private JavaSparkContext jsc;
    private static final Logger LOG = LoggerFactory.getLogger(TestGcsEventsHoodieIncrSource.class);

    @BeforeEach
    public void setUp() throws IOException {
        this.metaClient = this.getHoodieMetaClient(this.storageConf(), this.basePath());
        this.jsc = JavaSparkContext.fromSparkContext((SparkContext)this.spark().sparkContext());
        String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        props.put((Object)"hoodie.streamer.schema.provider.class.name", (Object)FilebasedSchemaProvider.class.getName());
        this.schemaProvider = Option.of((Object)new FilebasedSchemaProvider(props, this.jsc));
        MockitoAnnotations.initMocks((Object)((Object)this));
    }

    public String basePath() {
        return this.tempDir.toAbsolutePath().toUri().toString();
    }

    @Test
    public void shouldNotFindNewDataIfCommitTimeOfWriteAndReadAreEqual() throws IOException {
        String commitTimeForWrites;
        String commitTimeForReads = commitTimeForWrites = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 100L, (String)inserts.getKey());
    }

    @Test
    public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file3.json", (Object)200L, (Object)"1"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 100L, "1#path/to/file1.json");
    }

    @Test
    public void testTwoFilesAndContinueInSameCommit() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file3.json", (Object)200L, (Object)"1"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 250L, "1#path/to/file2.json");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file2.json"), 250L, "1#path/to/file3.json");
    }

    @Test
    public void largeBootstrapWithFilters() throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        for (int i = 0; i <= 10000; ++i) {
            filePathSizeAndCommitTime.add(Triple.of((Object)("path/to/file" + i + ".parquet"), (Object)100L, (Object)"1"));
        }
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file10005.json", (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file10006.json", (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file10007.json", (Object)200L, (Object)"1"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)commitTimeForReads), 250L, "1#path/to/file10006.json");
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
    }

    @ParameterizedTest
    @ValueSource(strings={".json", ".gz"})
    public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOException {
        String commitTimeForWrites = "2";
        String commitTimeForReads = "1";
        Pair<String, List<HoodieRecord>> inserts = this.writeGcsMetadataRecords(commitTimeForWrites);
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        if (!extension.endsWith("json")) {
            typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), extension);
        }
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file1%s", extension), (Object)100L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file2%s", IGNORE_FILE_EXTENSION), (Object)800L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file3%s", extension), (Object)200L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file2%s", extension), (Object)150L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file4%s", extension), (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file4%s", IGNORE_FILE_EXTENSION), (Object)200L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)String.format("path/to/file5%s", extension), (Object)150L, (Object)"2"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L);
        this.setMockQueryRunner(inputDs);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(100L, 0, bytesPerPartition.get(0)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)"1"), 100L, "1#path/to/file1" + extension, typedProperties);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(100L, 0, bytesPerPartition.get(1)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)("1#path/to/file1" + extension)), 100L, "1#path/to/file2" + extension, typedProperties);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(1000L, 0, bytesPerPartition.get(2)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)("1#path/to/file2" + extension)), 1000L, "2#path/to/file5" + extension, typedProperties);
        List<Integer> numPartitions = Arrays.asList(12, 2, 1);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class);
        ((CloudObjectsSelectorCommon)Mockito.verify((Object)this.cloudObjectsSelectorCommon, (VerificationMode)Mockito.atLeastOnce())).loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq(this.schemaProvider), ((Integer)argumentCaptor.capture()).intValue());
        org.junit.jupiter.api.Assertions.assertEquals(numPartitions, (Object)argumentCaptor.getAllValues());
    }

    @ParameterizedTest
    @CsvSource(value={"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"})
    public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException {
        this.writeGcsMetadataRecords("1");
        this.writeGcsMetadataRecords("2");
        this.writeGcsMetadataRecords("3");
        ArrayList<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<Triple<String, Long, String>>();
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file1.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file2.json", (Object)50L, (Object)"1"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip1.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/skip2.json", (Object)50L, (Object)"2"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file5.json", (Object)50L, (Object)"3"));
        filePathSizeAndCommitTime.add(Triple.of((Object)"path/to/file4.json", (Object)50L, (Object)"3"));
        Dataset<Row> inputDs = this.generateDataset(filePathSizeAndCommitTime);
        this.setMockQueryRunner(inputDs, (Option<String>)Option.of((Object)snapshotCheckPoint));
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip");
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn(null);
        List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000000000L);
        int sourcePartitions = 2;
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50000L, exptected1, typedProperties);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.of((Object)exptected1), 10L, exptected2, typedProperties);
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50L, exptected3, typedProperties);
        typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to");
        Mockito.when((Object)this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object)new S3EventsHoodieIncrSourceHarness.TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
        this.readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, (Option<String>)Option.empty(), 50L, exptected4, typedProperties);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class);
        ((CloudObjectsSelectorCommon)Mockito.verify((Object)this.cloudObjectsSelectorCommon, (VerificationMode)Mockito.atLeastOnce())).loadAsDataset((SparkSession)Mockito.any(), (List)Mockito.any(), (String)Mockito.any(), (Option)Mockito.eq(this.schemaProvider), ((Integer)argumentCaptor.capture()).intValue());
        ((HoodieIngestionMetrics)Mockito.verify((Object)this.metrics, (VerificationMode)Mockito.atLeastOnce())).updateStreamerSourceParallelism(((Integer)argumentCaptorForMetrics.capture()).intValue());
        List<Integer> numPartitions = snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2") ? Arrays.asList(12, 3, sourcePartitions) : Arrays.asList(23, sourcePartitions);
        org.junit.jupiter.api.Assertions.assertEquals(numPartitions, (Object)argumentCaptor.getAllValues());
        org.junit.jupiter.api.Assertions.assertEquals(numPartitions, (Object)argumentCaptorForMetrics.getAllValues());
    }

    @Test
    public void testUnsupportedCheckpoint() {
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, this.jsc(), this.spark(), new CloudDataFetcher(typedProperties, this.jsc(), this.spark(), this.metrics, this.cloudObjectsSelectorCommon), this.queryRunner, (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        Exception exception = (Exception)org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> incrSource.translateCheckpoint(Option.of((Object)new StreamerCheckpointV2("1"))));
        org.junit.jupiter.api.Assertions.assertEquals((Object)"For GcsEventsHoodieIncrSource, only StreamerCheckpointV1, i.e., requested time-based checkpoint, is supported. Checkpoint provided is: StreamerCheckpointV2{checkpointKey='1'}", (Object)exception.getMessage());
    }

    @Test
    public void testCreateSource() throws IOException {
        TypedProperties typedProperties = this.setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        Source gcsSource = UtilHelpers.createSource((String)GcsEventsHoodieIncrSource.class.getName(), (TypedProperties)typedProperties, (JavaSparkContext)this.jsc(), (SparkSession)this.spark(), (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        org.junit.jupiter.api.Assertions.assertEquals((Object)Source.SourceType.ROW, (Object)gcsSource.getSourceType());
        org.junit.jupiter.api.Assertions.assertThrows(IOException.class, () -> UtilHelpers.createSource((String)GcsEventsHoodieIncrSource.class.getName(), (TypedProperties)new TypedProperties(), (JavaSparkContext)this.jsc(), (SparkSession)this.spark(), (HoodieIngestionMetrics)this.metrics, (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier))));
    }

    private void setMockQueryRunner(Dataset<Row> inputDs) {
        this.setMockQueryRunner(inputDs, (Option<String>)Option.empty());
    }

    private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) {
        Mockito.when((Object)this.queryRunner.run((QueryInfo)Mockito.any(QueryInfo.class), (Option)Mockito.any())).thenAnswer(invocation -> {
            QueryInfo queryInfo = (QueryInfo)invocation.getArgument(0);
            QueryInfo updatedQueryInfo = (QueryInfo)nextCheckPointOpt.map(nextCheckPoint -> queryInfo.withUpdatedEndInstant(nextCheckPoint)).orElse((Object)queryInfo);
            if (updatedQueryInfo.isSnapshot()) {
                return Pair.of((Object)updatedQueryInfo, (Object)inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, updatedQueryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, updatedQueryInfo.getEndInstant())));
            }
            return Pair.of((Object)updatedQueryInfo, (Object)inputDs);
        });
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) {
        GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, this.jsc(), this.spark(), new CloudDataFetcher(typedProperties, this.jsc(), this.spark(), this.metrics, this.cloudObjectsSelectorCommon), this.queryRunner, (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        Pair dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull.isPresent() ? Option.of((Object)new StreamerCheckpointV1((String)checkpointToPull.get())) : Option.empty(), sourceLimit);
        Option datasetOpt = (Option)dataAndCheckpoint.getLeft();
        Checkpoint nextCheckPoint = (Checkpoint)dataAndCheckpoint.getRight();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)nextCheckPoint);
        org.junit.jupiter.api.Assertions.assertEquals((Object)new StreamerCheckpointV1(expectedCheckpoint), (Object)nextCheckPoint);
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) {
        TypedProperties typedProperties = this.setProps(missingCheckpointStrategy);
        typedProperties.put((Object)"hoodie.streamer.source.hoodieincr.file.format", (Object)"json");
        this.readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties);
    }

    private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) {
        String partitionPath = bucketName;
        String id = "id:" + bucketName + "/" + filename + "/" + generation;
        String mediaLink = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s?generation=%s&alt=media", bucketName, filename, generation);
        String selfLink = String.format("https://www.googleapis.com/storage/v1/b/%s/o/%s", bucketName, filename);
        GenericData.Record rec = new GenericData.Record(GCS_METADATA_SCHEMA);
        rec.put("_row_key", (Object)id);
        rec.put("partition_path", (Object)bucketName);
        rec.put("timestamp", (Object)Long.parseLong(commitTime));
        rec.put("bucket", (Object)bucketName);
        rec.put("contentLanguage", (Object)"en");
        rec.put("contentType", (Object)"application/octet-stream");
        rec.put("crc32c", (Object)"oRB3Aw==");
        rec.put("etag", (Object)"CP7EwYCu6/kCEAE=");
        rec.put("generation", (Object)generation);
        rec.put("id", (Object)id);
        rec.put("kind", (Object)"storage#object");
        rec.put("md5Hash", (Object)"McsS8FkcDSrB3cGfb18ysA==");
        rec.put("mediaLink", (Object)mediaLink);
        rec.put("metageneration", (Object)"1");
        rec.put("name", (Object)filename);
        rec.put("selfLink", (Object)selfLink);
        rec.put("size", (Object)"370");
        rec.put("storageClass", (Object)"STANDARD");
        rec.put("timeCreated", (Object)"2022-08-29T05:52:55.869Z");
        rec.put("timeStorageClassUpdated", (Object)"2022-08-29T05:52:55.869Z");
        rec.put("updated", (Object)"2022-08-29T05:52:55.869Z");
        HoodieAvroPayload payload = new HoodieAvroPayload(Option.of((Object)rec));
        return new HoodieAvroRecord(new HoodieKey(id, partitionPath), (HoodieRecordPayload)payload);
    }

    private HoodieWriteConfig getWriteConfig() {
        return this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String commitTime) throws IOException {
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            writeClient.startCommitWithTime(commitTime);
            List<HoodieRecord> gcsMetadataRecords = Arrays.asList(this.getGcsMetadataRecord(commitTime, "data-file-1.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-2.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-3.json", "bucket-1", "1"), this.getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1"));
            JavaRDD result = writeClient.upsert(this.jsc().parallelize(gcsMetadataRecords, 1), commitTime);
            List statuses = result.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            Pair pair = Pair.of((Object)commitTime, gcsMetadataRecords);
            return pair;
        }
    }

    private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
        TypedProperties properties = new TypedProperties();
        properties.put((Object)"hoodie.streamer.schema.provider.class.name", (Object)FilebasedSchemaProvider.class.getName());
        properties.setProperty("hoodie.streamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
        return properties;
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(GCS_METADATA_SCHEMA.toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(metaClient.getTableConfig().getTableName());
    }

    private String generateGCSEventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) throws JsonProcessingException {
        HashMap<String, Object> objectMetadata = new HashMap<String, Object>();
        objectMetadata.put("bucket", bucketName);
        objectMetadata.put("name", objectKey);
        objectMetadata.put("size", objectSize);
        objectMetadata.put("_hoodie_commit_time", commitTime);
        return this.mapper.writeValueAsString(objectMetadata);
    }

    private List<String> getSampleGCSObjectKeys(List<Triple<String, Long, String>> filePathSizeAndCommitTime) {
        return filePathSizeAndCommitTime.stream().map(f -> {
            try {
                return this.generateGCSEventMetadata((Long)f.getMiddle(), "bucket-1", (String)f.getLeft(), (String)f.getRight());
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    private Dataset<Row> generateDataset(List<Triple<String, Long, String>> filePathSizeAndCommitTime) {
        JavaRDD testRdd = this.jsc.parallelize(this.getSampleGCSObjectKeys(filePathSizeAndCommitTime), 2);
        Dataset inputDs = this.spark().read().json(testRdd);
        return inputDs;
    }
}

