/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.aws.sqs;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsManager;
import io.smallrye.reactive.messaging.aws.sqs.SqsOutboundMetadata;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

public class SqsOutboundChannel {
    private final Flow.Subscriber<? extends Message<?>> subscriber;
    private final SqsAsyncClient client;
    private final String channel;
    private final Uni<String> queueUrlUni;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final JsonMapping jsonMapping;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final boolean healthEnabled;
    private final String groupId;

    public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping) {
        this.channel = conf.getChannel();
        this.healthEnabled = conf.getHealthEnabled();
        this.client = sqsManager.getClient(conf);
        this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely();
        this.groupId = conf.getGroupId().orElse(null);
        this.jsonMapping = jsonMapping;
        this.subscriber = MultiUtils.via(multi -> multi.onSubscription().call(s -> this.queueUrlUni).call(m -> this.publishMessage(this.client, (Message<?>)m)).onFailure().invoke(f -> {
            AwsSqsLogging.log.unableToDispatch(this.channel, (Throwable)f);
            this.reportFailure((Throwable)f);
        }));
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return this.subscriber;
    }

    private Uni<Void> publishMessage(SqsAsyncClient client, Message<?> m) {
        if (this.closed.get()) {
            return Uni.createFrom().voidItem();
        }
        if (m.getPayload() == null) {
            return Uni.createFrom().nullItem();
        }
        return this.queueUrlUni.map(queueUrl -> this.getSendMessageRequest((String)queueUrl, m)).chain(request -> Uni.createFrom().completionStage(() -> client.sendMessage(request))).invoke(r -> AwsSqsLogging.log.messageSentToChannel(this.channel, r.messageId(), r.sequenceNumber())).onItemOrFailure().transformToUni((response, t) -> {
            if (t == null) {
                OutgoingMessageMetadata.setResultOnMessage((Message)m, (Object)response);
                return Uni.createFrom().completionStage(m.ack());
            }
            return Uni.createFrom().completionStage(m.nack(t));
        });
    }

    private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message<?> m) {
        Object payload = m.getPayload();
        String queueUrl = channelQueueUrl;
        if (payload instanceof SendMessageRequest) {
            return (SendMessageRequest)payload;
        }
        if (payload instanceof SendMessageRequest.Builder) {
            SendMessageRequest.Builder builder = ((SendMessageRequest.Builder)payload).queueUrl(queueUrl);
            if (this.groupId != null) {
                builder.messageGroupId(this.groupId);
            }
            return (SendMessageRequest)builder.build();
        }
        SendMessageRequest.Builder builder = SendMessageRequest.builder();
        HashMap<String, MessageAttributeValue> msgAttributes = new HashMap<String, MessageAttributeValue>();
        String groupId = this.groupId;
        Optional metadata = m.getMetadata(SqsOutboundMetadata.class);
        if (metadata.isPresent()) {
            SqsOutboundMetadata md = (SqsOutboundMetadata)metadata.get();
            if (md.getQueueUrl() != null) {
                queueUrl = md.getQueueUrl();
            }
            if (md.getDeduplicationId() != null) {
                builder.messageDeduplicationId(md.getDeduplicationId());
            }
            if (md.getGroupId() != null) {
                groupId = md.getGroupId();
            }
            if (md.getDelaySeconds() != null) {
                builder.delaySeconds(md.getDelaySeconds());
            }
            if (md.getMessageAttributes() != null) {
                msgAttributes.putAll(md.getMessageAttributes());
            }
        }
        if (payload instanceof software.amazon.awssdk.services.sqs.model.Message) {
            software.amazon.awssdk.services.sqs.model.Message msg = (software.amazon.awssdk.services.sqs.model.Message)payload;
            if (msg.hasAttributes()) {
                msgAttributes.putAll(msg.messageAttributes());
            }
            return (SendMessageRequest)builder.queueUrl(queueUrl).messageGroupId(groupId).messageAttributes(msgAttributes).messageBody(msg.body()).build();
        }
        String messageBody = this.outgoingPayloadClassName(payload, msgAttributes);
        return (SendMessageRequest)builder.queueUrl(queueUrl).messageGroupId(groupId).messageAttributes(msgAttributes).messageBody(messageBody).build();
    }

    private String outgoingPayloadClassName(Object payload, Map<String, MessageAttributeValue> messageAttributes) {
        if (payload instanceof String || payload.getClass().isPrimitive() || this.isPrimitiveBoxed(payload.getClass())) {
            messageAttributes.put("_classname", (MessageAttributeValue)MessageAttributeValue.builder().dataType("String").stringValue(payload.getClass().getName()).build());
            return String.valueOf(payload);
        }
        if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) {
            return new String((byte[])payload);
        }
        if (this.jsonMapping != null) {
            messageAttributes.put("_classname", (MessageAttributeValue)MessageAttributeValue.builder().dataType("String").stringValue(payload.getClass().getName()).build());
            return this.jsonMapping.toJson(payload);
        }
        return String.valueOf(payload);
    }

    private boolean isPrimitiveBoxed(Class<?> c) {
        return c.equals(Boolean.class) || c.equals(Integer.class) || c.equals(Byte.class) || c.equals(Double.class) || c.equals(Float.class) || c.equals(Short.class) || c.equals(Character.class) || c.equals(Long.class);
    }

    public void close() {
        this.closed.set(true);
    }

    private synchronized void reportFailure(Throwable failure) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            ArrayList<Throwable> actualFailures;
            SqsOutboundChannel sqsOutboundChannel = this;
            synchronized (sqsOutboundChannel) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }
}

