/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.aws.sqs;

import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsInboundChannel;
import io.smallrye.reactive.messaging.aws.sqs.SqsManager;
import io.smallrye.reactive.messaging.aws.sqs.SqsOutboundChannel;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxJsonMapping;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-sqs")
@ConnectorAttributes(value={@ConnectorAttribute(name="queue", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the SQS queue, defaults to channel name if not provided"), @ConnectorAttribute(name="queue.url", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The url of the SQS queue"), @ConnectorAttribute(name="region", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the SQS region"), @ConnectorAttribute(name="endpoint-override", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The endpoint override"), @ConnectorAttribute(name="credentials-provider", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The credential provider to be used in the client"), @ConnectorAttribute(name="health-enabled", type="boolean", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether health reporting is enabled (default) or disabled", defaultValue="true"), @ConnectorAttribute(name="group.id", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="When set, sends messages with the specified group id"), @ConnectorAttribute(name="wait-time-seconds", type="int", direction=ConnectorAttribute.Direction.INCOMING, description="The maximum amount of time in seconds to wait for messages to be received", defaultValue="1"), @ConnectorAttribute(name="max-number-of-messages", type="int", direction=ConnectorAttribute.Direction.INCOMING, description="The maximum number of messages to receive", defaultValue="10"), @ConnectorAttribute(name="visibility-timeout", type="int", direction=ConnectorAttribute.Direction.INCOMING, description="The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request"), @ConnectorAttribute(name="receive.request.message-attribute-names", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="The message attribute names to retrieve when receiving messages."), @ConnectorAttribute(name="receive.request.customizer", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided"), @ConnectorAttribute(name="receive.request.retries", type="long", direction=ConnectorAttribute.Direction.INCOMING, description="If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue="2147483647"), @ConnectorAttribute(name="receive.request.pause.resume", type="boolean", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue="true"), @ConnectorAttribute(name="ack.delete", type="boolean", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the acknowledgement deletes the message from the queue", defaultValue="true")})
public class SqsConnector
implements InboundConnector,
OutboundConnector,
HealthReporter {
    @Inject
    private SqsManager sqsManager;
    @Inject
    ExecutionHolder executionHolder;
    @Inject
    @Any
    Instance<SqsReceiveMessageRequestCustomizer> customizers;
    @Inject
    Instance<JsonMapping> jsonMappers;
    Vertx vertx;
    private static final List<SqsInboundChannel> INBOUND_CHANNELS = new CopyOnWriteArrayList<SqsInboundChannel>();
    private static final List<SqsOutboundChannel> OUTBOUND_CHANNELS = new CopyOnWriteArrayList<SqsOutboundChannel>();
    public static final String CONNECTOR_NAME = "smallrye-sqs";
    public static final String CLASS_NAME_ATTRIBUTE = "_classname";
    private JsonMapping jsonMapping;

    @PostConstruct
    void init() {
        this.vertx = this.executionHolder.vertx();
        this.jsonMapping = this.jsonMappers.isUnsatisfied() ? new VertxJsonMapping() : (JsonMapping)this.jsonMappers.get();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        INBOUND_CHANNELS.forEach(SqsInboundChannel::close);
        OUTBOUND_CHANNELS.forEach(SqsOutboundChannel::close);
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        SqsConnectorIncomingConfiguration conf = new SqsConnectorIncomingConfiguration(config);
        SqsReceiveMessageRequestCustomizer customizer = (SqsReceiveMessageRequestCustomizer)CDIUtils.getInstanceById(this.customizers, (String)conf.getReceiveRequestCustomizer().orElse(conf.getChannel()), () -> null);
        SqsInboundChannel channel = new SqsInboundChannel(conf, this.vertx, this.sqsManager, customizer, this.jsonMapping);
        INBOUND_CHANNELS.add(channel);
        return channel.getStream();
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        SqsConnectorOutgoingConfiguration conf = new SqsConnectorOutgoingConfiguration(config);
        SqsOutboundChannel channel = new SqsOutboundChannel(conf, this.sqsManager, this.jsonMapping);
        OUTBOUND_CHANNELS.add(channel);
        return channel.getSubscriber();
    }

    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (SqsInboundChannel sqsInboundChannel : INBOUND_CHANNELS) {
            sqsInboundChannel.isAlive(builder);
        }
        for (SqsOutboundChannel sqsOutboundChannel : OUTBOUND_CHANNELS) {
            sqsOutboundChannel.isAlive(builder);
        }
        return builder.build();
    }
}

