/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class S3EventsSource
extends RowSource {
    private final S3EventsMetaSelector pathSelector;
    private final List<Message> processedMessages = new ArrayList<Message>();
    AmazonSQS sqs;

    public S3EventsSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
        this.sqs = this.pathSelector.createAmazonSqsClient();
    }

    @Override
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
        Pair<List<String>, String> selectPathsWithLatestSqsMessage = this.pathSelector.getNextEventsFromQueue(this.sqs, lastCkptStr, this.processedMessages);
        if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) {
            return Pair.of(Option.empty(), selectPathsWithLatestSqsMessage.getRight());
        }
        Dataset eventRecords = this.sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), Encoders.STRING());
        return Pair.of(Option.of(this.sparkSession.read().json(eventRecords)), selectPathsWithLatestSqsMessage.getRight());
    }

    @Override
    public void onCommit(String lastCkptStr) {
        this.pathSelector.deleteProcessedMessages(this.sqs, this.pathSelector.queueUrl, this.processedMessages);
        this.processedMessages.clear();
    }
}

