/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.kork.pubsub.aws;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.aws.ARN;
import com.netflix.spinnaker.kork.pubsub.aws.AmazonSubscriptionInformation;
import com.netflix.spinnaker.kork.pubsub.aws.PubSubUtils;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonMessageAcknowledger;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonPubsubMessageHandler;
import com.netflix.spinnaker.kork.pubsub.aws.config.AmazonPubsubProperties;
import com.netflix.spinnaker.kork.pubsub.model.PubsubSubscriber;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQSSubscriber
implements Runnable,
PubsubSubscriber {
    private static final Logger log = LoggerFactory.getLogger(SQSSubscriber.class);
    private final AmazonSNS amazonSNS;
    private final AmazonSQS amazonSQS;
    private final AmazonPubsubProperties.AmazonPubsubSubscription subscription;
    private final AmazonPubsubMessageHandler messageHandler;
    private final AmazonMessageAcknowledger messageAcknowledger;
    private final Registry registry;
    private final Supplier<Boolean> isEnabled;
    private final ARN queueARN;
    private final ARN topicARN;
    private AmazonSubscriptionInformation subscriptionInfo;

    public SQSSubscriber(AmazonPubsubProperties.AmazonPubsubSubscription subscription, AmazonPubsubMessageHandler messageHandler, AmazonMessageAcknowledger messageAcknowledger, AmazonSNS amazonSNS, AmazonSQS amazonSQS, Supplier<Boolean> isEnabled, Registry registry) {
        this.subscription = subscription;
        this.messageHandler = messageHandler;
        this.messageAcknowledger = messageAcknowledger;
        this.amazonSNS = amazonSNS;
        this.amazonSQS = amazonSQS;
        this.isEnabled = isEnabled;
        this.registry = registry;
        this.queueARN = new ARN(subscription.getQueueARN());
        this.topicARN = new ARN(subscription.getTopicARN());
    }

    public String getWorkerName() {
        return this.queueARN.getArn() + "/" + SQSSubscriber.class.getSimpleName();
    }

    public String getPubsubSystem() {
        return "amazon";
    }

    public String getSubscriptionName() {
        return this.subscription.getName();
    }

    public String getName() {
        return this.getSubscriptionName();
    }

    @Override
    public void run() {
        log.info("Starting {}", (Object)this.getWorkerName());
        try {
            this.initializeQueue();
        }
        catch (Exception e) {
            log.error("Error initializing queue {}", (Object)this.queueARN, (Object)e);
            throw e;
        }
        while (true) {
            try {
                while (true) {
                    this.listenForMessages();
                }
            }
            catch (QueueDoesNotExistException e) {
                log.warn("Queue {} does not exist, recreating", (Object)this.queueARN, (Object)e);
                this.initializeQueue();
                continue;
            }
            catch (Exception e) {
                log.error("Unexpected error running {}, restarting worker", (Object)this.getWorkerName(), (Object)e);
                try {
                    Thread.sleep(500L);
                    continue;
                }
                catch (InterruptedException e1) {
                    log.error("Thread {} interrupted while sleeping", (Object)this.getWorkerName(), (Object)e1);
                    continue;
                }
            }
            break;
        }
    }

    private void initializeQueue() {
        String queueUrl = PubSubUtils.ensureQueueExists(this.amazonSQS, this.queueARN, this.topicARN, this.subscription.getSqsMessageRetentionPeriodSeconds());
        PubSubUtils.subscribeToTopic(this.amazonSNS, this.topicARN, this.queueARN);
        this.subscriptionInfo = AmazonSubscriptionInformation.builder().amazonSNS(this.amazonSNS).amazonSQS(this.amazonSQS).properties(this.subscription).queueUrl(queueUrl).build();
    }

    private void listenForMessages() {
        while (this.isEnabled.get().booleanValue()) {
            ReceiveMessageResult receiveMessageResult = this.amazonSQS.receiveMessage(new ReceiveMessageRequest(this.subscriptionInfo.queueUrl).withMaxNumberOfMessages(Integer.valueOf(this.subscription.getMaxNumberOfMessages())).withVisibilityTimeout(Integer.valueOf(this.subscription.getVisibilityTimeout())).withWaitTimeSeconds(Integer.valueOf(this.subscription.getWaitTimeSeconds())).withMessageAttributeNames(new String[]{"All"}));
            if (receiveMessageResult.getMessages().isEmpty()) {
                log.debug("Received no messages for queue {}", (Object)this.queueARN);
                continue;
            }
            receiveMessageResult.getMessages().forEach(this::handleMessage);
        }
    }

    private void handleMessage(Message message) {
        Exception caught = null;
        try {
            this.messageHandler.handleMessage(message);
            this.getSuccessCounter().increment();
        }
        catch (Exception e) {
            log.error("failed to process message {}", (Object)message, (Object)e);
            this.getErrorCounter(e).increment();
            caught = e;
        }
        if (caught == null) {
            this.messageAcknowledger.ack(this.subscriptionInfo, message);
        } else {
            this.messageAcknowledger.nack(this.subscriptionInfo, message);
        }
    }

    private Counter getSuccessCounter() {
        return this.registry.counter("pubsub.amazon.processed", new String[]{"subscription", this.getSubscriptionName()});
    }

    private Counter getErrorCounter(Exception e) {
        return this.registry.counter("pubsub.amazon.failed", new String[]{"subscription", this.getSubscriptionName(), "exceptionClass", e.getClass().getSimpleName()});
    }
}

