/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.operations;

import io.awspring.cloud.sqs.FifoUtils;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.QueueAttributesResolver;
import io.awspring.cloud.sqs.SqsAcknowledgementException;
import io.awspring.cloud.sqs.listener.QueueAttributes;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
import io.awspring.cloud.sqs.operations.AbstractMessagingTemplate;
import io.awspring.cloud.sqs.operations.SendResult;
import io.awspring.cloud.sqs.operations.SqsAsyncOperations;
import io.awspring.cloud.sqs.operations.SqsOperations;
import io.awspring.cloud.sqs.operations.SqsReceiveOptions;
import io.awspring.cloud.sqs.operations.SqsSendOptions;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.operations.SqsTemplateOptions;
import io.awspring.cloud.sqs.operations.TemplateContentBasedDeduplication;
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessageConversionContext;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeNameForSends;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

public class SqsTemplate
extends AbstractMessagingTemplate<Message>
implements SqsOperations,
SqsAsyncOperations {
    private static final Logger logger = LoggerFactory.getLogger(SqsTemplate.class);
    private static final SqsTemplateObservation.SqsSpecifics SQS_OBSERVATION_SPECIFICS = new SqsTemplateObservation.SqsSpecifics();
    private final Map<String, CompletableFuture<QueueAttributes>> queueAttributesCache = new ConcurrentHashMap<String, CompletableFuture<QueueAttributes>>();
    private final Map<String, SqsMessageConversionContext> conversionContextCache = new ConcurrentHashMap<String, SqsMessageConversionContext>();
    private final SqsAsyncClient sqsAsyncClient;
    private final Collection<QueueAttributeName> queueAttributeNames;
    private final QueueNotFoundStrategy queueNotFoundStrategy;
    private final Collection<String> messageAttributeNames;
    private final Collection<String> messageSystemAttributeNames;
    private final TemplateContentBasedDeduplication contentBasedDeduplication;

    private SqsTemplate(SqsTemplateBuilderImpl builder) {
        super(builder.messageConverter, builder.options, SQS_OBSERVATION_SPECIFICS);
        SqsTemplateOptionsImpl options = builder.options;
        this.sqsAsyncClient = builder.sqsAsyncClient;
        this.messageAttributeNames = options.messageAttributeNames;
        this.queueAttributeNames = options.queueAttributeNames;
        this.queueNotFoundStrategy = options.queueNotFoundStrategy;
        this.messageSystemAttributeNames = options.messageSystemAttributeNames;
        this.contentBasedDeduplication = options.contentBasedDeduplication;
    }

    public static SqsTemplateBuilder builder() {
        return new SqsTemplateBuilderImpl();
    }

    public static SqsTemplate newTemplate(SqsAsyncClient sqsAsyncClient) {
        return new SqsTemplateBuilderImpl().sqsAsyncClient(sqsAsyncClient).build();
    }

    public static SqsOperations newSyncTemplate(SqsAsyncClient sqsAsyncClient) {
        return SqsTemplate.newTemplate(sqsAsyncClient);
    }

    public static SqsAsyncOperations newAsyncTemplate(SqsAsyncClient sqsAsyncClient) {
        return SqsTemplate.newTemplate(sqsAsyncClient);
    }

    @Override
    public <T> SendResult<T> send(Consumer<SqsSendOptions<T>> to) {
        return this.unwrapCompletionException(this.sendAsync(to));
    }

    @Override
    public <T> CompletableFuture<SendResult<T>> sendAsync(Consumer<SqsSendOptions<T>> to) {
        Assert.notNull(to, (String)"to must not be null");
        SqsSendOptionsImpl options = new SqsSendOptionsImpl();
        to.accept(options);
        org.springframework.messaging.Message message = this.messageFromSendOptions(options);
        return this.sendAsync(options.queue, message);
    }

    private <T> org.springframework.messaging.Message<T> messageFromSendOptions(SqsSendOptionsImpl<T> options) {
        Assert.notNull(options.payload, (String)"payload must not be null");
        MessageBuilder builder = MessageBuilder.withPayload(options.payload).copyHeaders(options.headers);
        if (options.delay != null) {
            builder.setHeader("Sqs_Delay", (Object)options.delay);
        }
        if (options.messageDeduplicationId != null) {
            builder.setHeader("Sqs_Msa_MessageDeduplicationId", (Object)options.messageDeduplicationId);
        }
        if (options.messageGroupId != null) {
            builder.setHeader("Sqs_Msa_MessageGroupId", (Object)options.messageGroupId);
        }
        return builder.build();
    }

    @Override
    public Optional<org.springframework.messaging.Message<?>> receive(Consumer<SqsReceiveOptions> from) {
        return this.unwrapCompletionException(this.receiveAsync(from));
    }

    @Override
    public <T> Optional<org.springframework.messaging.Message<T>> receive(Consumer<SqsReceiveOptions> from, Class<T> payloadClass) {
        return this.unwrapCompletionException(this.receiveAsync(from, payloadClass));
    }

    @Override
    public Collection<org.springframework.messaging.Message<?>> receiveMany(Consumer<SqsReceiveOptions> from) {
        return this.unwrapCompletionException(this.receiveManyAsync(from));
    }

    @Override
    public <T> Collection<org.springframework.messaging.Message<T>> receiveMany(Consumer<SqsReceiveOptions> from, Class<T> payloadClass) {
        return this.unwrapCompletionException(this.receiveManyAsync(from, payloadClass));
    }

    @Override
    public CompletableFuture<Optional<org.springframework.messaging.Message<?>>> receiveAsync(Consumer<SqsReceiveOptions> from) {
        Assert.notNull(from, (String)"from must not be null");
        SqsReceiveOptionsImpl options = new SqsReceiveOptionsImpl();
        from.accept(options);
        Assert.isTrue((options.maxNumberOfMessages == null || options.maxNumberOfMessages == 1 ? 1 : 0) != 0, (String)"maxNumberOfMessages must be null or 1. Use receiveMany to receive more messages.");
        Map<String, Object> additionalHeaders = this.addAdditionalReceiveHeaders(options);
        return this.receiveAsync(options.queue, null, options.pollTimeout, additionalHeaders);
    }

    @Override
    public <T> CompletableFuture<Optional<org.springframework.messaging.Message<T>>> receiveAsync(Consumer<SqsReceiveOptions> from, Class<T> payloadClass) {
        Assert.notNull(from, (String)"from must not be null");
        Assert.notNull(payloadClass, (String)"payloadClass must not be null");
        SqsReceiveOptionsImpl options = new SqsReceiveOptionsImpl();
        from.accept(options);
        Assert.isTrue((options.maxNumberOfMessages == null || options.maxNumberOfMessages == 1 ? 1 : 0) != 0, (String)"maxNumberOfMessages must be null or 1. Use receiveMany to receive more messages.");
        Map<String, Object> additionalHeaders = this.addAdditionalReceiveHeaders(options);
        return this.receiveAsync(options.queue, payloadClass, options.pollTimeout, additionalHeaders).thenApply(x$0 -> super.castFromOptional((Optional<org.springframework.messaging.Message<?>>)x$0));
    }

    @Override
    public CompletableFuture<Collection<org.springframework.messaging.Message<?>>> receiveManyAsync(Consumer<SqsReceiveOptions> from) {
        Assert.notNull(from, (String)"from must not be null");
        SqsReceiveOptionsImpl options = new SqsReceiveOptionsImpl();
        from.accept(options);
        return this.receiveManyAsync(options.queue, null, options.pollTimeout, options.maxNumberOfMessages, this.addAdditionalReceiveHeaders(options));
    }

    @Override
    public <T> CompletableFuture<Collection<org.springframework.messaging.Message<T>>> receiveManyAsync(Consumer<SqsReceiveOptions> from, Class<T> payloadClass) {
        Assert.notNull(from, (String)"from must not be null");
        Assert.notNull(payloadClass, (String)"payloadClass must not be null");
        SqsReceiveOptionsImpl options = new SqsReceiveOptionsImpl();
        from.accept(options);
        return this.receiveManyAsync(options.queue, payloadClass, options.pollTimeout, options.maxNumberOfMessages, this.addAdditionalReceiveHeaders(options)).thenApply(x$0 -> super.castFromCollection((Collection<org.springframework.messaging.Message<?>>)x$0));
    }

    private Map<String, Object> addAdditionalReceiveHeaders(SqsReceiveOptionsImpl options) {
        HashMap<String, Object> additionalHeaders = new HashMap<String, Object>(options.additionalHeaders);
        if (options.visibilityTimeout != null) {
            additionalHeaders.put("Sqs_VisibilityTimeout", options.visibilityTimeout);
        }
        if (options.receiveRequestAttemptId != null) {
            additionalHeaders.put("Sqs_ReceiveRequestAttemptId", options.receiveRequestAttemptId);
        }
        return additionalHeaders;
    }

    @Override
    protected <T> org.springframework.messaging.Message<T> preProcessMessageForSend(String endpointToUse, org.springframework.messaging.Message<T> message) {
        return message;
    }

    @Override
    protected <T> Collection<org.springframework.messaging.Message<T>> preProcessMessagesForSend(String endpointToUse, Collection<org.springframework.messaging.Message<T>> messages) {
        return messages;
    }

    @Override
    protected <T> CompletableFuture<org.springframework.messaging.Message<T>> preProcessMessageForSendAsync(String endpointToUse, org.springframework.messaging.Message<T> message) {
        return FifoUtils.isFifo(endpointToUse) ? this.endpointHasContentBasedDeduplicationEnabled(endpointToUse).thenApply(enabled -> enabled != false ? this.addMissingFifoSendHeaders(message, Map.of()) : this.addMissingFifoSendHeaders(message, this.getRandomDeduplicationIdHeader())) : CompletableFuture.completedFuture(message);
    }

    @Override
    protected <T> CompletableFuture<Collection<org.springframework.messaging.Message<T>>> preProcessMessagesForSendAsync(String endpointToUse, Collection<org.springframework.messaging.Message<T>> messages) {
        return FifoUtils.isFifo(endpointToUse) ? this.endpointHasContentBasedDeduplicationEnabled(endpointToUse).thenApply(enabled -> messages.stream().map(message -> enabled != false ? this.addMissingFifoSendHeaders((org.springframework.messaging.Message)message, Map.of()) : this.addMissingFifoSendHeaders((org.springframework.messaging.Message)message, this.getRandomDeduplicationIdHeader())).toList()) : CompletableFuture.completedFuture(messages);
    }

    private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(org.springframework.messaging.Message<T> message, Map<String, Object> additionalHeaders) {
        return MessageHeaderUtils.addHeadersIfAbsent(message, Stream.concat(additionalHeaders.entrySet().stream(), this.getRandomMessageGroupIdHeader().entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    private Map<String, String> getRandomMessageGroupIdHeader() {
        return Map.of("Sqs_Msa_MessageGroupId", UUID.randomUUID().toString());
    }

    private Map<String, Object> getRandomDeduplicationIdHeader() {
        return Map.of("Sqs_Msa_MessageDeduplicationId", UUID.randomUUID().toString());
    }

    private CompletableFuture<Boolean> endpointHasContentBasedDeduplicationEnabled(String endpointName) {
        return TemplateContentBasedDeduplication.AUTO.equals((Object)this.contentBasedDeduplication) ? this.handleAutoDeduplication(endpointName) : CompletableFuture.completedFuture(this.contentBasedDeduplication.equals((Object)TemplateContentBasedDeduplication.ENABLED));
    }

    private CompletableFuture<Boolean> handleAutoDeduplication(String endpointName) {
        return this.getQueueAttributes(endpointName).thenApply(attributes -> Boolean.parseBoolean(attributes.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)));
    }

    @Override
    protected <T> CompletableFuture<SendResult<T>> doSendAsync(String endpointName, Message message, org.springframework.messaging.Message<T> originalMessage) {
        return ((CompletableFuture)this.createSendMessageRequest(endpointName, message).thenCompose(arg_0 -> ((SqsAsyncClient)this.sqsAsyncClient).sendMessage(arg_0))).thenApply(response -> this.createSendResult(UUID.fromString(response.messageId()), response.sequenceNumber(), endpointName, originalMessage));
    }

    private <T> SendResult<T> createSendResult(UUID messageId, @Nullable String sequenceNumber, String endpointName, org.springframework.messaging.Message<T> originalMessage) {
        return new SendResult<T>(messageId, endpointName, originalMessage, sequenceNumber != null ? Collections.singletonMap("sequenceNumber", sequenceNumber) : Collections.emptyMap());
    }

    private CompletableFuture<SendMessageRequest> createSendMessageRequest(String endpointName, Message message) {
        return this.getQueueAttributes(endpointName).thenApply(queueAttributes -> this.doCreateSendMessageRequest(message, (QueueAttributes)queueAttributes));
    }

    private SendMessageRequest doCreateSendMessageRequest(Message message, QueueAttributes queueAttributes) {
        return (SendMessageRequest)SendMessageRequest.builder().queueUrl(queueAttributes.getQueueUrl()).messageBody(message.body()).messageDeduplicationId((String)message.attributes().get(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID)).messageGroupId((String)message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID)).delaySeconds(this.getDelaySeconds(message)).messageAttributes(this.excludeKnownFields(message.messageAttributes())).messageSystemAttributes(this.mapMessageSystemAttributes(message)).build();
    }

    @Override
    protected <T> CompletableFuture<SendResult.Batch<T>> doSendBatchAsync(String endpointName, Collection<Message> messages, Collection<org.springframework.messaging.Message<T>> originalMessages) {
        logger.debug("Sending messages {} to endpoint {}", messages, (Object)endpointName);
        return ((CompletableFuture)this.createSendMessageBatchRequest(endpointName, messages).thenCompose(arg_0 -> ((SqsAsyncClient)this.sqsAsyncClient).sendMessageBatch(arg_0))).thenApply(response -> this.createSendResultBatch((SendMessageBatchResponse)response, endpointName, originalMessages.stream().collect(Collectors.toMap(MessageHeaderUtils::getId, msg -> msg))));
    }

    private <T> SendResult.Batch<T> createSendResultBatch(SendMessageBatchResponse response, String endpointName, Map<String, org.springframework.messaging.Message<T>> originalMessagesById) {
        return new SendResult.Batch<T>(this.doCreateSendResultBatch(response, endpointName, originalMessagesById), this.createSendResultFailed(response, endpointName, originalMessagesById));
    }

    private <T> Collection<SendResult.Failed<T>> createSendResultFailed(SendMessageBatchResponse response, String endpointName, Map<String, org.springframework.messaging.Message<T>> originalMessagesById) {
        return response.failed().stream().map(entry -> new SendResult.Failed(entry.message(), endpointName, (org.springframework.messaging.Message)originalMessagesById.get(entry.id()), Map.of("senderFault", entry.senderFault(), "code", entry.code()))).toList();
    }

    private <T> Collection<SendResult<T>> doCreateSendResultBatch(SendMessageBatchResponse response, String endpointName, Map<String, org.springframework.messaging.Message<T>> originalMessagesById) {
        return response.successful().stream().map(entry -> this.createSendResult(UUID.fromString(entry.messageId()), entry.sequenceNumber(), endpointName, this.getOriginalMessage(originalMessagesById, (SendMessageBatchResultEntry)entry))).toList();
    }

    private <T> org.springframework.messaging.Message<T> getOriginalMessage(Map<String, org.springframework.messaging.Message<T>> originalMessagesById, SendMessageBatchResultEntry entry) {
        org.springframework.messaging.Message<T> originalMessage = originalMessagesById.get(entry.id());
        Assert.notNull(originalMessage, () -> "Could not correlate send result to original message for id %s. Original messages: %s.".formatted(entry.messageId(), originalMessagesById));
        return originalMessage;
    }

    @Override
    @Nullable
    protected <T> MessageConversionContext getReceiveMessageConversionContext(String endpointName, @Nullable Class<T> payloadClass) {
        return this.conversionContextCache.computeIfAbsent(endpointName, newEndpoint -> this.doGetSqsMessageConversionContext(endpointName, payloadClass));
    }

    private <T> SqsMessageConversionContext doGetSqsMessageConversionContext(String newEndpoint, @Nullable Class<T> payloadClass) {
        SqsMessageConversionContext conversionContext = new SqsMessageConversionContext();
        conversionContext.setSqsAsyncClient(this.sqsAsyncClient);
        conversionContext.setQueueAttributes(this.getAttributesImmediately(newEndpoint));
        if (payloadClass != null) {
            conversionContext.setPayloadClass(payloadClass);
        }
        conversionContext.setAcknowledgementCallback(new TemplateAcknowledgementCallback());
        return conversionContext;
    }

    private QueueAttributes getAttributesImmediately(String newEndpoint) {
        CompletableFuture<QueueAttributes> queueAttributes = this.getQueueAttributes(newEndpoint);
        Assert.isTrue((boolean)queueAttributes.isDone(), () -> "Queue attributes not done for " + newEndpoint);
        return queueAttributes.join();
    }

    private CompletableFuture<SendMessageBatchRequest> createSendMessageBatchRequest(String endpointName, Collection<Message> messages) {
        return this.getQueueAttributes(endpointName).thenApply(queueAttributes -> this.doCreateSendMessageBatchRequest(messages, (QueueAttributes)queueAttributes));
    }

    private SendMessageBatchRequest doCreateSendMessageBatchRequest(Collection<Message> messages, QueueAttributes queueAttributes) {
        return (SendMessageBatchRequest)SendMessageBatchRequest.builder().queueUrl(queueAttributes.getQueueUrl()).entries((Collection)messages.stream().map(this::createSendMessageBatchRequestEntry).collect(Collectors.toList())).build();
    }

    private SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(Message message) {
        return (SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().id(message.messageId()).messageBody(message.body()).messageDeduplicationId((String)message.attributes().get(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID)).messageGroupId((String)message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID)).delaySeconds(this.getDelaySeconds(message)).messageAttributes(this.excludeKnownFields(message.messageAttributes())).messageSystemAttributes(this.mapMessageSystemAttributes(message)).build();
    }

    private Map<String, MessageAttributeValue> excludeKnownFields(Map<String, MessageAttributeValue> messageAttributes) {
        return messageAttributes.entrySet().stream().filter(entry -> !"Sqs_Delay".equals(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Nullable
    private Integer getDelaySeconds(Message message) {
        return message.messageAttributes().containsKey("Sqs_Delay") ? Integer.valueOf(Integer.parseInt(((MessageAttributeValue)message.messageAttributes().get("Sqs_Delay")).stringValue())) : null;
    }

    private Map<MessageSystemAttributeNameForSends, MessageSystemAttributeValue> mapMessageSystemAttributes(Message message) {
        return message.attributes().entrySet().stream().filter(Predicate.not(entry -> this.isSkipAttribute((MessageSystemAttributeName)entry.getKey()))).collect(Collectors.toMap(entry -> MessageSystemAttributeNameForSends.fromValue((String)((MessageSystemAttributeName)entry.getKey()).toString()), entry -> (MessageSystemAttributeValue)MessageSystemAttributeValue.builder().dataType("String").stringValue((String)entry.getValue()).build()));
    }

    private boolean isSkipAttribute(MessageSystemAttributeName name) {
        return MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID.equals((Object)name) || MessageSystemAttributeName.MESSAGE_GROUP_ID.equals((Object)name);
    }

    private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName) {
        CompletableFuture future = this.queueAttributesCache.computeIfAbsent(endpointName, newName -> this.doGetQueueAttributes(endpointName, (String)newName));
        future.whenComplete((result, throwable) -> {
            if (throwable != null) {
                this.queueAttributesCache.remove(endpointName);
                logger.debug("Removed failed queue attributes from cache for: {}", (Object)endpointName);
            }
        });
        return future;
    }

    private CompletableFuture<QueueAttributes> doGetQueueAttributes(String endpointName, String newName) {
        return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName).queueNotFoundStrategy(this.queueNotFoundStrategy).queueAttributeNames(this.maybeAddContentBasedDeduplicationAttribute(endpointName)).build().resolveQueueAttributes();
    }

    private Collection<QueueAttributeName> maybeAddContentBasedDeduplicationAttribute(String endpointName) {
        return FifoUtils.isFifo(endpointName) && TemplateContentBasedDeduplication.AUTO.equals((Object)this.contentBasedDeduplication) ? Stream.concat(this.queueAttributeNames.stream(), Stream.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)).toList() : this.queueAttributeNames;
    }

    @Override
    protected Map<String, Object> handleAdditionalHeaders(Map<String, Object> additionalHeaders) {
        HashMap<String, Object> headers = new HashMap<String, Object>(additionalHeaders);
        headers.remove("Sqs_VisibilityTimeout");
        headers.remove("Sqs_ReceiveRequestAttemptId");
        return headers;
    }

    @Override
    protected CompletableFuture<Void> doAcknowledgeMessages(String endpointName, Collection<org.springframework.messaging.Message<?>> messages) {
        return this.deleteMessages(endpointName, messages);
    }

    @Override
    protected CompletableFuture<Collection<Message>> doReceiveAsync(String endpointName, Duration pollTimeout, Integer maxNumberOfMessages, Map<String, Object> additionalHeaders) {
        logger.trace("Receiving messages with settings: endpointName - {}, pollTimeout - {}, maxNumberOfMessages - {}, additionalHeaders - {}", new Object[]{endpointName, pollTimeout, maxNumberOfMessages, additionalHeaders});
        return ((CompletableFuture)this.createReceiveMessageRequest(endpointName, pollTimeout, maxNumberOfMessages, additionalHeaders).thenCompose(arg_0 -> ((SqsAsyncClient)this.sqsAsyncClient).receiveMessage(arg_0))).thenApply(ReceiveMessageResponse::messages);
    }

    @Override
    protected Map<String, Object> preProcessHeadersForReceive(String endpointToUse, Map<String, Object> headers) {
        return FifoUtils.isFifo(endpointToUse) ? this.addMissingFifoReceiveHeaders(headers) : headers;
    }

    private Map<String, Object> addMissingFifoReceiveHeaders(Map<String, Object> headers) {
        headers.putIfAbsent("Sqs_ReceiveRequestAttemptId", UUID.randomUUID());
        return headers;
    }

    private CompletableFuture<Void> deleteMessages(String endpointName, Collection<org.springframework.messaging.Message<?>> messages) {
        logger.trace("Acknowledging in queue {} messages {}", (Object)endpointName, (Object)MessageHeaderUtils.getId(this.addTypeToMessages(messages)));
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.getQueueAttributes(endpointName).thenCompose(attributes -> this.sqsAsyncClient.deleteMessageBatch((DeleteMessageBatchRequest)DeleteMessageBatchRequest.builder().queueUrl(attributes.getQueueUrl()).entries(this.createDeleteMessageEntries(messages)).build()))).exceptionallyCompose(t -> this.createAcknowledgementException(endpointName, (Collection<org.springframework.messaging.Message<?>>)Collections.emptyList(), messages, (Throwable)t))).thenCompose(response -> !response.failed().isEmpty() ? this.createAcknowledgementException(endpointName, this.getSuccessfulAckMessages((DeleteMessageBatchResponse)response, messages, endpointName), this.getFailedAckMessages((DeleteMessageBatchResponse)response, messages, endpointName), null) : CompletableFuture.completedFuture(response))).whenComplete((response, t) -> this.logAcknowledgement(endpointName, messages, (DeleteMessageBatchResponse)response, (Throwable)t))).thenRun(() -> {});
    }

    private Collection<org.springframework.messaging.Message<?>> getFailedAckMessages(DeleteMessageBatchResponse response, Collection<org.springframework.messaging.Message<?>> messages, String endpointName) {
        return response.failed().stream().map(BatchResultErrorEntry::id).map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getId(msg).equals(id)).findFirst().orElseThrow(() -> new SqsAcknowledgementException("Could not correlate ids for acknowledgement failure", Collections.emptyList(), messages, endpointName))).collect(Collectors.toList());
    }

    private Collection<org.springframework.messaging.Message<?>> getSuccessfulAckMessages(DeleteMessageBatchResponse response, Collection<org.springframework.messaging.Message<?>> messages, String endpointName) {
        return response.successful().stream().map(DeleteMessageBatchResultEntry::id).map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getId(msg).equals(id)).findFirst().orElseThrow(() -> new SqsAcknowledgementException("Could not correlate ids for acknowledgement failure", Collections.emptyList(), messages, endpointName))).collect(Collectors.toList());
    }

    private CompletableFuture<DeleteMessageBatchResponse> createAcknowledgementException(String endpointName, Collection<org.springframework.messaging.Message<?>> successfulAckMessages, Collection<org.springframework.messaging.Message<?>> failedAckMessages, @Nullable Throwable t) {
        return CompletableFuture.failedFuture((Throwable)((Object)new SqsAcknowledgementException("Error acknowledging messages", successfulAckMessages, failedAckMessages, endpointName, t)));
    }

    private void logAcknowledgement(String endpointName, Collection<org.springframework.messaging.Message<?>> messages, DeleteMessageBatchResponse response, @Nullable Throwable t) {
        if (t != null) {
            logger.error("Error acknowledging in queue {} messages {}", (Object)endpointName, (Object)MessageHeaderUtils.getId(this.addTypeToMessages(messages)));
        } else if (!response.failed().isEmpty()) {
            logger.warn("Some messages could not be acknowledged in queue {}: {}", (Object)endpointName, response.failed().stream().map(BatchResultErrorEntry::id).toList());
        } else {
            logger.trace("Acknowledged messages in queue {}: {}", (Object)endpointName, (Object)MessageHeaderUtils.getId(this.addTypeToMessages(messages)));
        }
    }

    private Collection<DeleteMessageBatchRequestEntry> createDeleteMessageEntries(Collection<org.springframework.messaging.Message<?>> messages) {
        return messages.stream().map(message -> (DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().id(MessageHeaderUtils.getId(message)).receiptHandle(MessageHeaderUtils.getHeaderAsString(message, "Sqs_ReceiptHandle")).build()).collect(Collectors.toList());
    }

    private CompletableFuture<ReceiveMessageRequest> createReceiveMessageRequest(String endpointName, Duration pollTimeout, Integer maxNumberOfMessages, Map<String, Object> additionalHeaders) {
        return this.getQueueAttributes(endpointName).thenApply(attributes -> this.doCreateReceiveMessageRequest(pollTimeout, maxNumberOfMessages, (QueueAttributes)attributes, additionalHeaders));
    }

    private ReceiveMessageRequest doCreateReceiveMessageRequest(Duration pollTimeout, Integer maxNumberOfMessages, QueueAttributes attributes, Map<String, Object> additionalHeaders) {
        ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.builder().queueUrl(attributes.getQueueUrl()).maxNumberOfMessages(maxNumberOfMessages).messageAttributeNames(this.messageAttributeNames).attributeNamesWithStrings(this.messageSystemAttributeNames).waitTimeSeconds(Integer.valueOf(this.toInt(pollTimeout.toSeconds())));
        if (additionalHeaders.containsKey("Sqs_VisibilityTimeout")) {
            builder.visibilityTimeout(Integer.valueOf(this.toInt(this.getValueAs(additionalHeaders, "Sqs_VisibilityTimeout", Duration.class).toSeconds())));
        }
        if (additionalHeaders.containsKey("Sqs_ReceiveRequestAttemptId")) {
            builder.receiveRequestAttemptId(this.getValueAs(additionalHeaders, "Sqs_ReceiveRequestAttemptId", UUID.class).toString());
        }
        return (ReceiveMessageRequest)builder.build();
    }

    private int toInt(long longValue) {
        if (longValue > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return (int)longValue;
    }

    private <V> V getValueAs(Map<String, Object> headers, String headerName, Class<V> valueClass) {
        return valueClass.cast(headers.get(headerName));
    }

    private static SqsMessagingMessageConverter createDefaultMessageConverter() {
        return new SqsMessagingMessageConverter();
    }

    private static class SqsTemplateBuilderImpl
    implements SqsTemplateBuilder {
        private final SqsTemplateOptionsImpl options = new SqsTemplateOptionsImpl();
        private SqsAsyncClient sqsAsyncClient;
        private MessagingMessageConverter<Message> messageConverter;

        private SqsTemplateBuilderImpl() {
        }

        @Override
        public SqsTemplateBuilder sqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
            Assert.notNull((Object)sqsAsyncClient, (String)"sqsAsyncClient must not be null");
            this.sqsAsyncClient = sqsAsyncClient;
            return this;
        }

        @Override
        public SqsTemplateBuilder messageConverter(MessagingMessageConverter<Message> messageConverter) {
            Assert.notNull(messageConverter, (String)"messageConverter must not be null");
            Assert.isNull(this.messageConverter, (String)"messageConverter already configured");
            this.messageConverter = messageConverter;
            return this;
        }

        @Override
        public SqsTemplateBuilder configureDefaultConverter(Consumer<SqsMessagingMessageConverter> messageConverterConfigurer) {
            Assert.notNull(messageConverterConfigurer, (String)"messageConverterConfigurer must not be null");
            Assert.isNull(this.messageConverter, (String)"messageConverter already configured");
            SqsMessagingMessageConverter defaultMessageConverter = SqsTemplate.createDefaultMessageConverter();
            messageConverterConfigurer.accept(defaultMessageConverter);
            this.messageConverter = defaultMessageConverter;
            return this;
        }

        @Override
        public SqsTemplateBuilder configure(Consumer<SqsTemplateOptions> options) {
            Assert.notNull(options, (String)"options must not be null");
            options.accept(this.options);
            return this;
        }

        @Override
        public SqsTemplate build() {
            Assert.notNull((Object)this.sqsAsyncClient, (String)"no sqsAsyncClient set");
            if (this.messageConverter == null) {
                this.messageConverter = SqsTemplate.createDefaultMessageConverter();
            }
            return new SqsTemplate(this);
        }

        @Override
        public SqsOperations buildSyncTemplate() {
            return this.build();
        }

        @Override
        public SqsAsyncOperations buildAsyncTemplate() {
            return this.build();
        }
    }

    private static class SqsTemplateOptionsImpl
    extends AbstractMessagingTemplate.AbstractMessagingTemplateOptions<SqsTemplateOptions>
    implements SqsTemplateOptions {
        private Collection<QueueAttributeName> queueAttributeNames = Collections.emptyList();
        private QueueNotFoundStrategy queueNotFoundStrategy = QueueNotFoundStrategy.CREATE;
        private Collection<String> messageAttributeNames = Collections.singletonList("All");
        private Collection<String> messageSystemAttributeNames = Collections.singletonList("All");
        private TemplateContentBasedDeduplication contentBasedDeduplication = TemplateContentBasedDeduplication.AUTO;

        private SqsTemplateOptionsImpl() {
        }

        @Override
        public SqsTemplateOptions queueAttributeNames(Collection<QueueAttributeName> queueAttributeNames) {
            Assert.notEmpty(queueAttributeNames, (String)"queueAttributeNames cannot be null or empty");
            this.queueAttributeNames = queueAttributeNames;
            return this;
        }

        @Override
        public SqsTemplateOptions defaultQueue(String defaultQueue) {
            super.defaultEndpointName(defaultQueue);
            return this;
        }

        @Override
        public SqsTemplateOptions queueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) {
            Assert.notNull((Object)((Object)queueNotFoundStrategy), (String)"queueNotFoundStrategy cannot be null");
            this.queueNotFoundStrategy = queueNotFoundStrategy;
            return this;
        }

        @Override
        public SqsTemplateOptions messageAttributeNames(Collection<String> messageAttributeNames) {
            this.messageAttributeNames = messageAttributeNames;
            return this;
        }

        @Override
        public SqsTemplateOptions messageSystemAttributeNames(Collection<MessageSystemAttributeName> messageSystemAttributeNames) {
            this.messageSystemAttributeNames = messageSystemAttributeNames.stream().map(Enum::name).toList();
            return this;
        }

        @Override
        public SqsTemplateOptions contentBasedDeduplication(TemplateContentBasedDeduplication contentBasedDeduplication) {
            this.contentBasedDeduplication = contentBasedDeduplication;
            return this;
        }

        @Override
        public SqsTemplateOptions observationConvention(SqsTemplateObservation.Convention observationConvention) {
            Assert.notNull((Object)observationConvention, (String)"observationConvention cannot be null");
            super.observationConvention(observationConvention);
            return this;
        }
    }

    private static class SqsSendOptionsImpl<T>
    implements SqsSendOptions<T> {
        protected final Map<String, Object> headers = new HashMap<String, Object>();
        @Nullable
        private String messageGroupId;
        @Nullable
        private String messageDeduplicationId;
        @Nullable
        protected String queue;
        @Nullable
        protected T payload;
        @Nullable
        protected Integer delay;

        private SqsSendOptionsImpl() {
        }

        @Override
        public SqsSendOptionsImpl<T> queue(String queue) {
            Assert.hasText((String)queue, (String)"queue must have text");
            this.queue = queue;
            return this;
        }

        @Override
        public SqsSendOptionsImpl<T> payload(T payload) {
            Assert.notNull(payload, (String)"payload must not be null");
            this.payload = payload;
            return this;
        }

        @Override
        public SqsSendOptionsImpl<T> header(String headerName, Object headerValue) {
            Assert.hasText((String)headerName, (String)"headerName must have text");
            Assert.notNull((Object)headerValue, (String)"headerValue must not be null");
            this.headers.put(headerName, headerValue);
            return this;
        }

        @Override
        public SqsSendOptionsImpl<T> headers(Map<String, Object> headers) {
            Assert.notNull(headers, (String)"headers must not be null");
            this.headers.putAll(headers);
            return this;
        }

        @Override
        public SqsSendOptionsImpl<T> delaySeconds(Integer delaySeconds) {
            Assert.notNull((Object)delaySeconds, (String)"delaySeconds must not be null");
            this.delay = delaySeconds;
            return this;
        }

        @Override
        public SqsSendOptions<T> messageGroupId(String messageGroupId) {
            Assert.hasText((String)messageGroupId, (String)"messageGroupId must have text");
            this.messageGroupId = messageGroupId;
            return this;
        }

        @Override
        public SqsSendOptions<T> messageDeduplicationId(String messageDeduplicationId) {
            Assert.hasText((String)messageDeduplicationId, (String)"messageDeduplicationId must have text");
            this.messageDeduplicationId = messageDeduplicationId;
            return this;
        }
    }

    private static class SqsReceiveOptionsImpl
    implements SqsReceiveOptions {
        protected final Map<String, Object> additionalHeaders = new HashMap<String, Object>();
        @Nullable
        protected String queue;
        @Nullable
        protected Duration pollTimeout;
        @Nullable
        protected Duration visibilityTimeout;
        @Nullable
        protected Integer maxNumberOfMessages;
        @Nullable
        private UUID receiveRequestAttemptId;

        private SqsReceiveOptionsImpl() {
        }

        @Override
        public SqsReceiveOptionsImpl queue(String queue) {
            Assert.notNull((Object)queue, (String)"queue must not be null");
            this.queue = queue;
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl pollTimeout(Duration pollTimeout) {
            Assert.notNull((Object)pollTimeout, (String)"pollTimeout must not be null");
            this.pollTimeout = pollTimeout;
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl visibilityTimeout(Duration visibilityTimeout) {
            Assert.notNull((Object)visibilityTimeout, (String)"visibilityTimeout must not be null");
            this.visibilityTimeout = visibilityTimeout;
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl maxNumberOfMessages(Integer maxNumberOfMessages) {
            Assert.notNull((Object)maxNumberOfMessages, (String)"maxNumberOfMessages must not be null");
            Assert.isTrue((maxNumberOfMessages > 0 && maxNumberOfMessages <= 10 ? 1 : 0) != 0, (String)"maxNumberOfMessages must be between 0 and 10");
            this.maxNumberOfMessages = maxNumberOfMessages;
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl additionalHeader(String name, Object value) {
            Assert.notNull((Object)name, (String)"name must not be null");
            Assert.notNull((Object)value, (String)"value must not be null");
            this.additionalHeaders.put(name, value);
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl additionalHeaders(Map<String, Object> additionalHeaders) {
            Assert.notNull(additionalHeaders, (String)"additionalHeaders must not be null");
            this.additionalHeaders.putAll(additionalHeaders);
            return this;
        }

        @Override
        public SqsReceiveOptionsImpl receiveRequestAttemptId(UUID receiveRequestAttemptId) {
            Assert.notNull((Object)receiveRequestAttemptId, (String)"receiveRequestAttemptId must not be null");
            this.receiveRequestAttemptId = receiveRequestAttemptId;
            return this;
        }
    }

    private class TemplateAcknowledgementCallback<T>
    implements AcknowledgementCallback<T> {
        private TemplateAcknowledgementCallback() {
        }

        @Override
        public CompletableFuture<Void> onAcknowledge(org.springframework.messaging.Message<T> message) {
            return SqsTemplate.this.deleteMessages(MessageHeaderUtils.getHeaderAsString(message, "Sqs_QueueName"), Collections.singletonList(message));
        }

        @Override
        public CompletableFuture<Void> onAcknowledge(Collection<org.springframework.messaging.Message<T>> messages) {
            return messages.isEmpty() ? CompletableFuture.completedFuture(null) : SqsTemplate.this.deleteMessages(MessageHeaderUtils.getHeaderAsString(messages.iterator().next(), "Sqs_QueueName"), messages.stream().map(msg -> msg).collect(Collectors.toList()));
        }
    }
}

