/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
public class S3EventsHoodieIncrSourceHarness
extends SparkClientFunctionalTestHarness {
    protected static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource(S3EventsHoodieIncrSourceHarness.class, (String)"/streamer-config/s3-metadata.avsc", (boolean)true);
    protected ObjectMapper mapper = new ObjectMapper();
    protected static final String MY_BUCKET = "some-bucket";
    protected static final String IGNORE_FILE_EXTENSION = ".ignore";
    protected Option<SchemaProvider> schemaProvider;
    @Mock
    QueryRunner mockQueryRunner;
    @Mock
    CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;
    @Mock
    SourceProfileSupplier sourceProfileSupplier;
    @Mock
    QueryInfo queryInfo;
    @Mock
    HoodieIngestionMetrics metrics;
    protected JavaSparkContext jsc;
    protected HoodieTableMetaClient metaClient;

    @BeforeEach
    public void setUp() throws IOException {
        this.jsc = JavaSparkContext.fromSparkContext((SparkContext)this.spark().sparkContext());
        String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.streamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        props.put((Object)"hoodie.streamer.schema.provider.class.name", (Object)FilebasedSchemaProvider.class.getName());
        this.schemaProvider = Option.of((Object)new FilebasedSchemaProvider(props, this.jsc));
    }

    protected List<String> getSampleS3ObjectKeys(List<Triple<String, Long, String>> filePathSizeAndCommitTime) {
        return filePathSizeAndCommitTime.stream().map(f -> {
            try {
                return this.generateS3EventMetadata((Long)f.getMiddle(), MY_BUCKET, (String)f.getLeft(), (String)f.getRight());
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    protected Dataset<Row> generateDataset(List<Triple<String, Long, String>> filePathSizeAndCommitTime) {
        JavaRDD testRdd = this.jsc.parallelize(this.getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
        Dataset inputDs = this.spark().read().json(testRdd);
        return inputDs;
    }

    protected String generateS3EventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) throws JsonProcessingException {
        HashMap<String, Object> objectMetadata = new HashMap<String, Object>();
        objectMetadata.put("size", objectSize);
        objectMetadata.put("key", objectKey);
        HashMap<String, String> bucketMetadata = new HashMap<String, String>();
        bucketMetadata.put("name", bucketName);
        HashMap<String, HashMap<String, Object>> s3Metadata = new HashMap<String, HashMap<String, Object>>();
        s3Metadata.put("object", objectMetadata);
        s3Metadata.put("bucket", bucketMetadata);
        HashMap<String, Object> eventMetadata = new HashMap<String, Object>();
        eventMetadata.put("s3", s3Metadata);
        eventMetadata.put("_hoodie_commit_time", commitTime);
        return this.mapper.writeValueAsString(eventMetadata);
    }

    protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) {
        String partitionPath = bucketName;
        Schema schema = S3_METADATA_SCHEMA;
        GenericData.Record rec = new GenericData.Record(schema);
        Schema.Field s3Field = schema.getField("s3");
        Schema s3Schema = (Schema)s3Field.schema().getTypes().get(1);
        GenericData.Record s3Record = new GenericData.Record(s3Schema);
        Schema.Field s3BucketField = s3Schema.getField("bucket");
        Schema s3Bucket = (Schema)s3BucketField.schema().getTypes().get(1);
        GenericData.Record s3BucketRec = new GenericData.Record(s3Bucket);
        s3BucketRec.put("name", (Object)bucketName);
        Schema.Field s3ObjectField = s3Schema.getField("object");
        Schema s3Object = (Schema)s3ObjectField.schema().getTypes().get(1);
        GenericData.Record s3ObjectRec = new GenericData.Record(s3Object);
        s3ObjectRec.put("key", (Object)objectKey);
        s3ObjectRec.put("size", (Object)objectSize);
        s3Record.put("bucket", (Object)s3BucketRec);
        s3Record.put("object", (Object)s3ObjectRec);
        rec.put("s3", (Object)s3Record);
        rec.put("_hoodie_commit_time", (Object)commitTime);
        HoodieAvroPayload payload = new HoodieAvroPayload(Option.of((Object)rec));
        return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), (HoodieRecordPayload)payload);
    }

    protected TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
        TypedProperties properties = new TypedProperties();
        properties.setProperty("hoodie.streamer.source.hoodieincr.path", this.basePath());
        properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", "json");
        return properties;
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(S3_METADATA_SCHEMA.toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(metaClient.getTableConfig().getTableName());
    }

    protected HoodieWriteConfig getWriteConfig() {
        return this.getConfigBuilder(this.basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    protected Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String commitTime) throws IOException {
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(writeConfig);){
            writeClient.startCommitWithTime(commitTime);
            List<HoodieRecord> s3MetadataRecords = Arrays.asList(this.generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L));
            JavaRDD result = writeClient.upsert(this.jsc().parallelize(s3MetadataRecords, 1), commitTime);
            List statuses = result.collect();
            Assertions.assertNoWriteErrors((List)statuses);
            Pair pair = Pair.of((Object)commitTime, s3MetadataRecords);
            return pair;
        }
    }

    protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) {
        S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, this.jsc(), this.spark(), this.mockQueryRunner, new CloudDataFetcher(typedProperties, this.jsc(), this.spark(), this.metrics, this.mockCloudObjectsSelectorCommon), (StreamContext)new DefaultStreamContext((SchemaProvider)this.schemaProvider.orElse(null), Option.of((Object)this.sourceProfileSupplier)));
        Pair dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull.isPresent() ? Option.of((Object)new StreamerCheckpointV1((String)checkpointToPull.get())) : Option.empty(), sourceLimit);
        Option datasetOpt = (Option)dataAndCheckpoint.getLeft();
        Checkpoint nextCheckPoint = (Checkpoint)dataAndCheckpoint.getRight();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)nextCheckPoint);
        org.junit.jupiter.api.Assertions.assertEquals((Object)new StreamerCheckpointV1(expectedCheckpoint), (Object)nextCheckPoint);
    }

    protected void setMockQueryRunner(Dataset<Row> inputDs) {
        this.setMockQueryRunner(inputDs, (Option<String>)Option.empty());
    }

    protected void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) {
        Mockito.when((Object)this.mockQueryRunner.run((QueryInfo)Mockito.any(QueryInfo.class), (Option)Mockito.any())).thenAnswer(invocation -> {
            QueryInfo queryInfo = (QueryInfo)invocation.getArgument(0);
            QueryInfo updatedQueryInfo = (QueryInfo)nextCheckPointOpt.map(nextCheckPoint -> queryInfo.withUpdatedEndInstant(nextCheckPoint)).orElse((Object)queryInfo);
            if (updatedQueryInfo.isSnapshot()) {
                return Pair.of((Object)updatedQueryInfo, (Object)inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, updatedQueryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, updatedQueryInfo.getEndInstant())));
            }
            return Pair.of((Object)updatedQueryInfo, (Object)inputDs);
        });
    }

    protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) {
        TypedProperties typedProperties = this.setProps(missingCheckpointStrategy);
        this.readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties);
    }

    static class TestSourceProfile
    implements SourceProfile<Long> {
        private final long maxSourceBytes;
        private final int sourcePartitions;
        private final long bytesPerPartition;

        public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long bytesPerPartition) {
            this.maxSourceBytes = maxSourceBytes;
            this.sourcePartitions = sourcePartitions;
            this.bytesPerPartition = bytesPerPartition;
        }

        public long getMaxSourceBytes() {
            return this.maxSourceBytes;
        }

        public int getSourcePartitions() {
            return this.sourcePartitions;
        }

        public Long getSourceSpecificContext() {
            return this.bytesPerPartition;
        }
    }
}

