/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.sqs.internal.source;

import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import java.util.Map;
import org.mule.extension.sqs.internal.connection.SQSConnection;
import org.mule.extension.sqs.internal.operation.SQSModelFactory;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageReceiver
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    private volatile boolean running = true;
    private final SourceCallback<String, Map<String, Object>> sourceCallback;
    private final SQSConnection connection;
    private final int visibilityTimeout;
    private final boolean preserveMessages;
    private final int numberOfMessages;
    private final String queueUrl;
    private final int waitTime;
    private final long pollingPeriodMilliseconds;

    public MessageReceiver(SourceCallback<String, Map<String, Object>> sourceCallback, SQSConnection connection, int visibilityTimeout, boolean preserveMessages, int numberOfMessages, String queueUrl, int waitTime, long pollingPeriodMilliseconds) {
        this.sourceCallback = sourceCallback;
        this.connection = connection;
        this.visibilityTimeout = visibilityTimeout;
        this.preserveMessages = preserveMessages;
        this.numberOfMessages = numberOfMessages;
        this.queueUrl = queueUrl;
        this.waitTime = waitTime;
        this.pollingPeriodMilliseconds = pollingPeriodMilliseconds;
    }

    @Override
    public void run() {
        SourceCallbackContext context = this.sourceCallback.createContext();
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withAttributeNames(new String[]{"All"}).withMessageAttributeNames(new String[]{"All"}).withQueueUrl(this.queueUrl).withMaxNumberOfMessages(Integer.valueOf(this.numberOfMessages)).withVisibilityTimeout(Integer.valueOf(this.visibilityTimeout)).withWaitTimeSeconds(Integer.valueOf(this.waitTime));
        while (this.running) {
            try {
                logger.debug("Handling received message from queue: {} ", (Object)this.queueUrl);
                this.handleRequest(receiveMessageRequest, context);
                if (this.pollingPeriodMilliseconds <= 0L) continue;
                Thread.sleep(this.pollingPeriodMilliseconds);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection));
                return;
            }
            catch (Exception e) {
                logger.error("Handle message request failed with exception: ", (Throwable)e);
                this.sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection));
                return;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        MessageReceiver messageReceiver = this;
        synchronized (messageReceiver) {
            this.running = false;
        }
    }

    private void handleRequest(ReceiveMessageRequest request, SourceCallbackContext context) {
        ReceiveMessageResult receiveMessageResult = this.connection.receiveMessage(request);
        receiveMessageResult.getMessages().forEach(receivedMessage -> {
            Result result = Result.builder().output((Object)receivedMessage.getBody()).attributes(SQSModelFactory.wrapMessageAttributes(receivedMessage)).build();
            this.sourceCallback.handle(result, context);
            if (!this.preserveMessages) {
                this.connection.deleteMessage(new DeleteMessageRequest(this.queueUrl, receivedMessage.getReceiptHandle()));
            }
        });
    }
}

