/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.sqs.internal.source;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.extension.sqs.internal.config.SQSConfiguration;
import org.mule.extension.sqs.internal.connection.SQSConnection;
import org.mule.extension.sqs.internal.error.exception.NumberOfThreadsOutOfRangeException;
import org.mule.extension.sqs.internal.error.exception.SQSRuntimeException;
import org.mule.extension.sqs.internal.error.exception.WaitTimeOutOfRangeException;
import org.mule.extension.sqs.internal.source.MessageReceiver;
import org.mule.extension.sqs.internal.source.params.PollingParameters;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DisplayName(value="Receive messages")
@Alias(value="receivemessages")
@EmitsResponse
@MediaType(value="text/plain")
@ClusterSupport(value=SourceClusterSupport.DEFAULT_ALL_NODES)
public class ReceiveMessagesSource
extends Source<String, Map<String, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(ReceiveMessagesSource.class);
    private static final String SCHEDULER_NAME = "mule-amazon-sqs-connector_receive-messages-source";
    @Config
    private SQSConfiguration configuration;
    @Connection
    private ConnectionProvider<SQSConnection> connectionProvider;
    @Parameter
    @Optional(defaultValue="30")
    @Summary(value="The duration the retrieved messages are hidden from subsequent calls to retrieve.")
    private int visibilityTimeout;
    @Parameter
    @Optional(defaultValue="SECONDS")
    @Summary(value="Time unit to be used in the Visibility Timeout configuration.")
    private TimeUnit visibilityTimeoutUnit;
    @Parameter
    @Optional(defaultValue="false")
    @Summary(value="Flag that indicates if you want to preserve the messages in the queue. False by default, so the messages are going to be deleted.")
    private boolean preserveMessages;
    @Parameter
    @Optional(defaultValue="1")
    @Summary(value="The number of messages to be retrieved on each call (10 messages max). By default, 1 message will be retrieved.")
    private int numberOfMessages;
    @Parameter
    @Optional
    @Summary(value="The queue URL where messages are to be fetched from.")
    private String queueUrl;
    @DisplayName(value="Wait time")
    @Parameter
    @ConfigOverride
    @Placement(tab="Advanced", order=1)
    @Summary(value="Maximum duration of a single poll, in seconds. Valid values are from 0 to 20. Use 0 for short polling.")
    private int waitTime;
    @DisplayName(value="Number of consuming threads")
    @Parameter
    @Optional(defaultValue="1")
    @Placement(tab="Advanced", order=2)
    @Summary(value="Number of threads used to consume the messages in the inbound flow. This value must be an integer greater than 0.")
    private int numberOfConsumingThreads;
    @ParameterGroup(name="Polling parameters")
    private PollingParameters pollingParameters;
    private MessageReceiver messageReceiver;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private SchedulerConfig schedulerConfig;
    private Scheduler scheduler;

    public void onStart(SourceCallback<String, Map<String, Object>> sourceCallback) throws MuleException {
        if (this.waitTime < 0 || this.waitTime > 20) {
            throw new WaitTimeOutOfRangeException(this.waitTime);
        }
        if (this.numberOfConsumingThreads <= 0) {
            throw new NumberOfThreadsOutOfRangeException(this.numberOfConsumingThreads);
        }
        if (this.pollingParameters.getStartDelayInMillis() >= 0) {
            try {
                Thread.sleep(this.pollingParameters.getStartDelayInMillis());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SQSRuntimeException(e);
            }
        }
        SQSConnection sqsClient = (SQSConnection)this.connectionProvider.connect();
        this.messageReceiver = new MessageReceiver(sourceCallback, sqsClient, Math.toIntExact(this.visibilityTimeoutUnit.toSeconds(this.visibilityTimeout)), this.preserveMessages, this.numberOfMessages, this.configuration.getUrl(this.queueUrl), this.waitTime, this.pollingParameters.getFrequencyInMillis());
        this.scheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(SCHEDULER_NAME).withMaxConcurrentTasks(this.numberOfConsumingThreads).withShutdownTimeout(30L, TimeUnit.SECONDS));
        for (int i = 0; i < this.numberOfConsumingThreads; ++i) {
            logger.debug("Starting message receiver on thread {}", (Object)i);
            this.scheduler.execute((Runnable)this.messageReceiver);
        }
    }

    public void onStop() {
        if (this.messageReceiver != null) {
            logger.debug("Stopping message receiver");
            this.messageReceiver.stop();
        }
        if (this.scheduler != null) {
            logger.debug("Stopping scheduler with name {}", (Object)SCHEDULER_NAME);
            this.scheduler.stop();
        }
    }
}

