/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.aws.sqs;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsManager;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsDeleteAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsNothingAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.PausablePollingStream;
import io.vertx.core.Context;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

public class SqsInboundChannel {
    private final String channel;
    private final SqsAsyncClient client;
    private final io.vertx.mutiny.core.Context context;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Uni<String> queueUrlUni;
    private final Flow.Publisher<? extends Message<?>> stream;
    private final ScheduledExecutorService requestExecutor;
    private final int waitTimeSeconds;
    private final int maxNumberOfMessages;
    private final SqsReceiveMessageRequestCustomizer customizer;
    private final long retries;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final boolean healthEnabled;
    private final List<String> messageAttributeNames;
    private final Integer visibilityTimeout;

    public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager, SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper) {
        this.channel = conf.getChannel();
        this.healthEnabled = conf.getHealthEnabled();
        this.retries = conf.getReceiveRequestRetries();
        this.client = sqsManager.getClient(conf);
        this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely();
        this.context = io.vertx.mutiny.core.Context.newInstance((Context)((VertxInternal)vertx.getDelegate()).createEventLoopContext());
        this.requestExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "smallrye-aws-sqs-request-thread-" + this.channel));
        this.waitTimeSeconds = conf.getWaitTimeSeconds();
        this.visibilityTimeout = conf.getVisibilityTimeout().orElse(null);
        this.maxNumberOfMessages = conf.getMaxNumberOfMessages();
        this.messageAttributeNames = this.getMessageAttributeNames(conf);
        this.customizer = customizer;
        SqsAckHandler ackHandler = conf.getAckDelete() != false ? new SqsDeleteAckHandler(this.client, this.queueUrlUni) : new SqsNothingAckHandler();
        PausablePollingStream pollingStream = new PausablePollingStream(this.channel, this.request(null, 0), (messages, processor) -> {
            if (messages != null) {
                for (software.amazon.awssdk.services.sqs.model.Message message : messages) {
                    processor.onNext(message);
                }
            }
        }, this.requestExecutor, this.maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume().booleanValue());
        this.stream = Multi.createFrom().deferred(() -> this.queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream())).emitOn(r -> this.context.runOnContext(r)).onItem().transform(message -> new SqsMessage((software.amazon.awssdk.services.sqs.model.Message)message, jsonMapper, ackHandler)).onFailure().invoke(throwable -> {
            AwsSqsLogging.log.errorReceivingMessage(this.channel, (Throwable)throwable);
            this.reportFailure((Throwable)throwable, false);
        });
    }

    private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
        ArrayList<String> names = new ArrayList<String>();
        names.add("_classname");
        conf.getReceiveRequestMessageAttributeNames().ifPresent(s -> names.addAll(Arrays.asList(s.split(","))));
        return names;
    }

    public synchronized void reportFailure(Throwable failure, boolean fatal) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
        if (fatal) {
            this.close();
        }
    }

    public Uni<List<software.amazon.awssdk.services.sqs.model.Message>> request(String requestId, int retryCount) {
        return this.queueUrlUni.map(queueUrl -> {
            ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.builder().queueUrl(queueUrl).messageAttributeNames(this.messageAttributeNames).waitTimeSeconds(Integer.valueOf(this.waitTimeSeconds)).maxNumberOfMessages(Integer.valueOf(this.maxNumberOfMessages));
            if (requestId != null) {
                builder.receiveRequestAttemptId(requestId);
            }
            if (this.visibilityTimeout != null) {
                builder.visibilityTimeout(this.visibilityTimeout);
            }
            if (this.customizer != null) {
                this.customizer.customize(builder);
            }
            return builder;
        }).chain(builder -> Uni.createFrom().completionStage(() -> this.client.receiveMessage((ReceiveMessageRequest)builder.build()))).onItem().transform(response -> {
            List messages = response.messages();
            if (messages == null || messages.isEmpty()) {
                AwsSqsLogging.log.receivedEmptyMessage();
                return null;
            }
            if (AwsSqsLogging.log.isTraceEnabled()) {
                messages.forEach(m -> AwsSqsLogging.log.receivedMessage(m.body()));
            }
            return messages;
        }).onFailure(e -> e instanceof SqsException && ((SqsException)e).retryable()).recoverWithUni(e -> {
            if ((long)retryCount < this.retries) {
                return this.request(((SqsException)e).requestId(), retryCount + 1);
            }
            return Uni.createFrom().failure(e);
        });
    }

    public Flow.Publisher<? extends Message<?>> getStream() {
        return this.stream;
    }

    public void close() {
        this.closed.set(true);
        this.requestExecutor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            ArrayList<Throwable> actualFailures;
            SqsInboundChannel sqsInboundChannel = this;
            synchronized (sqsInboundChannel) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }
}

