/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.config.S3SourceConfig;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

public class CloudObjectsSelector {
    public static final List<String> ALLOWED_S3_EVENT_PREFIX = Collections.singletonList("ObjectCreated");
    public static final String S3_PREFIX = "s3://";
    public static volatile Logger log = LoggerFactory.getLogger(CloudObjectsSelector.class);
    public static final String SQS_ATTR_APPROX_MESSAGES = "ApproximateNumberOfMessages";
    static final String SQS_MODEL_MESSAGE = "Message";
    static final String SQS_MODEL_EVENT_RECORDS = "Records";
    static final String SQS_MODEL_EVENT_NAME = "eventName";
    static final String S3_MODEL_EVENT_TIME = "eventTime";
    static final String S3_FILE_SIZE = "fileSize";
    static final String S3_FILE_PATH = "filePath";
    public final String queueUrl;
    public final int longPollWait;
    public final int maxMessagePerBatch;
    public final int maxMessagesPerRequest;
    public final int visibilityTimeout;
    public final TypedProperties props;
    public final String fsName;
    private final String regionName;

    public CloudObjectsSelector(TypedProperties props) {
        ConfigUtils.checkRequiredConfigProperties(props, Arrays.asList(S3SourceConfig.S3_SOURCE_QUEUE_URL, S3SourceConfig.S3_SOURCE_QUEUE_REGION));
        this.props = props;
        this.queueUrl = ConfigUtils.getStringWithAltKeys(props, S3SourceConfig.S3_SOURCE_QUEUE_URL);
        this.regionName = ConfigUtils.getStringWithAltKeys(props, S3SourceConfig.S3_SOURCE_QUEUE_REGION);
        this.fsName = ConfigUtils.getStringWithAltKeys((Properties)props, S3SourceConfig.S3_SOURCE_QUEUE_FS, true);
        this.longPollWait = ConfigUtils.getIntWithAltKeys(props, S3SourceConfig.S3_QUEUE_LONG_POLL_WAIT);
        this.maxMessagePerBatch = ConfigUtils.getIntWithAltKeys(props, S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH);
        this.maxMessagesPerRequest = ConfigUtils.getIntWithAltKeys(props, S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST);
        this.visibilityTimeout = ConfigUtils.getIntWithAltKeys(props, S3SourceConfig.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT);
    }

    protected Map<String, String> getSqsQueueAttributes(SqsClient sqsClient, String queueUrl) {
        GetQueueAttributesResponse queueAttributesResult = sqsClient.getQueueAttributes((GetQueueAttributesRequest)GetQueueAttributesRequest.builder().queueUrl(queueUrl).attributeNames(new QueueAttributeName[]{QueueAttributeName.fromValue((String)SQS_ATTR_APPROX_MESSAGES)}).build());
        return queueAttributesResult.attributesAsStrings();
    }

    protected Map<String, Object> getFileAttributesFromRecord(JSONObject record) throws UnsupportedEncodingException {
        HashMap<String, Object> fileRecord = new HashMap<String, Object>();
        String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME);
        long eventTime = Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
        JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
        String bucket = URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
        String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
        String filePath = this.fsName + "://" + bucket + "/" + key;
        fileRecord.put(S3_MODEL_EVENT_TIME, eventTime);
        fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size"));
        fileRecord.put(S3_FILE_PATH, filePath);
        return fileRecord;
    }

    public SqsClient createAmazonSqsClient() {
        return (SqsClient)((SqsClientBuilder)SqsClient.builder().region(Region.of((String)this.regionName))).build();
    }

    protected List<Message> getMessagesToProcess(SqsClient sqsClient, String queueUrl, int longPollWait, int visibilityTimeout, int maxMessagePerBatch, int maxMessagesPerRequest) {
        ArrayList<Message> messagesToProcess = new ArrayList<Message>();
        ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest)ReceiveMessageRequest.builder().queueUrl(queueUrl).waitTimeSeconds(Integer.valueOf(longPollWait)).visibilityTimeout(Integer.valueOf(visibilityTimeout)).maxNumberOfMessages(Integer.valueOf(maxMessagesPerRequest)).build();
        Map<String, String> queueAttributesResult = this.getSqsQueueAttributes(sqsClient, queueUrl);
        long approxMessagesAvailable = Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES));
        log.info("Approximately " + approxMessagesAvailable + " messages available in queue.");
        long numMessagesToProcess = Math.min(approxMessagesAvailable, (long)maxMessagePerBatch);
        for (int i = 0; i < (int)Math.ceil((double)numMessagesToProcess / (double)maxMessagesPerRequest); ++i) {
            List messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
            log.debug("Number of messages: " + messages.size());
            messagesToProcess.addAll(messages);
            if (messages.isEmpty()) break;
        }
        return messagesToProcess;
    }

    protected List<List<Message>> createListPartitions(List<Message> singleList, int eachBatchSize) {
        ArrayList<List<Message>> listPartitions = new ArrayList<List<Message>>();
        if (singleList.size() == 0 || eachBatchSize < 1) {
            return listPartitions;
        }
        for (int start2 = 0; start2 < singleList.size(); start2 += eachBatchSize) {
            int end = Math.min(start2 + eachBatchSize, singleList.size());
            if (start2 > end) {
                throw new IndexOutOfBoundsException("Index " + start2 + " is out of the list range <0," + (singleList.size() - 1) + ">");
            }
            listPartitions.add(new ArrayList<Message>(singleList.subList(start2, end)));
        }
        return listPartitions;
    }

    protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, List<Message> messagesToBeDeleted) {
        if (messagesToBeDeleted.isEmpty()) {
            return;
        }
        DeleteMessageBatchRequest.Builder builder = DeleteMessageBatchRequest.builder().queueUrl(queueUrl);
        ArrayList<Object> deleteEntries = new ArrayList<Object>();
        for (Message message : messagesToBeDeleted) {
            deleteEntries.add(DeleteMessageBatchRequestEntry.builder().id(message.messageId()).receiptHandle(message.receiptHandle()).build());
        }
        builder.entries(deleteEntries);
        DeleteMessageBatchResponse deleteResponse = sqs.deleteMessageBatch((DeleteMessageBatchRequest)builder.build());
        List deleteFailures = deleteResponse.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toList());
        if (!deleteFailures.isEmpty()) {
            log.warn("Failed to delete " + deleteFailures.size() + " messages out of " + deleteEntries.size() + " from queue.");
        } else {
            log.info("Successfully deleted " + deleteEntries.size() + " messages from queue.");
        }
    }

    public void deleteProcessedMessages(SqsClient sqs, String queueUrl, List<Message> processedMessages) {
        if (!processedMessages.isEmpty()) {
            List<List<Message>> deleteBatches = this.createListPartitions(processedMessages, 10);
            for (List<Message> deleteBatch : deleteBatches) {
                this.deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
            }
        }
    }

    public static class Config {
        @Deprecated
        public static final String S3_SOURCE_QUEUE_URL = S3SourceConfig.S3_SOURCE_QUEUE_URL.key();
        @Deprecated
        public static final String S3_SOURCE_QUEUE_REGION = S3SourceConfig.S3_SOURCE_QUEUE_REGION.key();
        @Deprecated
        public static final String S3_SOURCE_QUEUE_FS = S3SourceConfig.S3_SOURCE_QUEUE_FS.key();
        @Deprecated
        public static final String S3_QUEUE_LONG_POLL_WAIT = S3SourceConfig.S3_QUEUE_LONG_POLL_WAIT.key();
        @Deprecated
        public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH.key();
        @Deprecated
        public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST = S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST.key();
        @Deprecated
        public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT = S3SourceConfig.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT.key();
        @Deprecated
        public static final String SOURCE_INPUT_SELECTOR = DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR.key();
    }
}

