/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector;
import org.json.JSONException;
import org.json.JSONObject;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

public class S3EventsMetaSelector
extends CloudObjectsSelector {
    private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements";

    public S3EventsMetaSelector(TypedProperties props) {
        super(props);
    }

    public static S3EventsMetaSelector createSourceSelector(TypedProperties props) {
        String sourceSelectorClass = ConfigUtils.getStringWithAltKeys((TypedProperties)props, DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR, (String)S3EventsMetaSelector.class.getName());
        try {
            S3EventsMetaSelector selector = (S3EventsMetaSelector)ReflectionUtils.loadClass((String)sourceSelectorClass, (Class[])new Class[]{TypedProperties.class}, (Object[])new Object[]{props});
            log.info("Using path selector " + selector.getClass().getName());
            return selector;
        }
        catch (Exception e) {
            throw new HoodieException("Could not load source selector class " + sourceSelectorClass, (Throwable)e);
        }
    }

    protected List<Map<String, Object>> getValidEvents(SqsClient sqs, List<Message> processedMessages) throws IOException {
        List<Message> messages = this.getMessagesToProcess(sqs, this.queueUrl, this.longPollWait, this.visibilityTimeout, this.maxMessagePerBatch, this.maxMessagesPerRequest);
        return this.processAndDeleteInvalidMessages(processedMessages, messages);
    }

    private List<Map<String, Object>> processAndDeleteInvalidMessages(List<Message> processedMessages, List<Message> messages) throws IOException {
        ArrayList<Map<String, Object>> validEvents = new ArrayList<Map<String, Object>>();
        for (Message message : messages) {
            JSONObject messageBody = new JSONObject(message.body());
            ObjectMapper mapper = new ObjectMapper();
            Map messageMap = messageBody.has("Message") ? (Map)mapper.readValue(messageBody.getString("Message"), Map.class) : (Map)mapper.readValue(messageBody.toString(), Map.class);
            if (messageMap.containsKey("Records")) {
                List events = (List)messageMap.get("Records");
                for (Map event : events) {
                    event.remove(S3_EVENT_RESPONSE_ELEMENTS);
                    String eventName = (String)event.get("eventName");
                    if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
                        validEvents.add(event);
                        continue;
                    }
                    log.debug(String.format("This S3 event %s is not allowed, so ignoring it.", eventName));
                }
            } else {
                log.debug(String.format("Message is not expected format or it's s3:TestEvent. Message: %s", message));
            }
            processedMessages.add(message);
        }
        return validEvents;
    }

    public Pair<List<String>, String> getNextEventsFromQueue(SqsClient sqs, Option<String> lastCheckpointStr, List<Message> processedMessages) {
        processedMessages.clear();
        log.info("Reading messages....");
        try {
            log.info("Start Checkpoint : " + lastCheckpointStr);
            List<Map<String, Object>> eventRecords = this.getValidEvents(sqs, processedMessages);
            log.info("Number of valid events: " + eventRecords.size());
            ArrayList<String> filteredEventRecords = new ArrayList<String>();
            long newCheckpointTime = eventRecords.stream().mapToLong(eventRecord -> Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse((String)eventRecord.get("eventTime")))).getTime()).max().orElse((Long)lastCheckpointStr.map(Long::parseLong).orElse((Object)0L));
            for (Map<String, Object> eventRecord2 : eventRecords) {
                filteredEventRecords.add(new ObjectMapper().writeValueAsString(eventRecord2).replace("%3D", "=").replace("%24", "$").replace("%A3", "\u00a3").replace("%23", "#").replace("%26", "&").replace("%3F", "?").replace("%7E", "~").replace("%25", "%").replace("%2B", "+"));
            }
            String newCheckpoint = newCheckpointTime == 0L ? (String)lastCheckpointStr.orElse(null) : String.valueOf(newCheckpointTime);
            return new ImmutablePair(filteredEventRecords, (Object)newCheckpoint);
        }
        catch (IOException | JSONException e) {
            throw new HoodieException("Unable to read from SQS: ", e);
        }
    }
}

