/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.sqs.eventqueue;

import com.amazonaws.auth.policy.Policy;
import com.amazonaws.auth.policy.Principal;
import com.amazonaws.auth.policy.Resource;
import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import com.amazonaws.services.sqs.model.SetQueueAttributesResult;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.metrics.Monitors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;

public class SQSObservableQueue
implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSObservableQueue.class);
    private static final String QUEUE_TYPE = "sqs";
    private final String queueName;
    private final int visibilityTimeoutInSeconds;
    private final int batchSize;
    private final AmazonSQS client;
    private final long pollTimeInMS;
    private final String queueURL;
    private final Scheduler scheduler;
    private volatile boolean running;

    private SQSObservableQueue(String queueName, AmazonSQS client, int visibilityTimeoutInSeconds, int batchSize, long pollTimeInMS, List<String> accountsToAuthorize, Scheduler scheduler) {
        this.queueName = queueName;
        this.client = client;
        this.visibilityTimeoutInSeconds = visibilityTimeoutInSeconds;
        this.batchSize = batchSize;
        this.pollTimeInMS = pollTimeInMS;
        this.queueURL = this.getOrCreateQueue();
        this.scheduler = scheduler;
        this.addPolicy(accountsToAuthorize);
    }

    public Observable<Message> observe() {
        Observable.OnSubscribe<Message> subscriber = this.getOnSubscribe();
        return Observable.create(subscriber);
    }

    public List<String> ack(List<Message> messages) {
        return this.delete(messages);
    }

    public void publish(List<Message> messages) {
        this.publishMessages(messages);
    }

    public long size() {
        GetQueueAttributesResult attributes = this.client.getQueueAttributes(this.queueURL, Collections.singletonList("ApproximateNumberOfMessages"));
        String sizeAsStr = (String)attributes.getAttributes().get("ApproximateNumberOfMessages");
        try {
            return Long.parseLong(sizeAsStr);
        }
        catch (Exception e) {
            return -1L;
        }
    }

    public void setUnackTimeout(Message message, long unackTimeout) {
        int unackTimeoutInSeconds = (int)(unackTimeout / 1000L);
        ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(this.queueURL, message.getReceipt(), Integer.valueOf(unackTimeoutInSeconds));
        this.client.changeMessageVisibility(request);
    }

    public String getType() {
        return QUEUE_TYPE;
    }

    public String getName() {
        return this.queueName;
    }

    public String getURI() {
        return this.queueURL;
    }

    public long getPollTimeInMS() {
        return this.pollTimeInMS;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getVisibilityTimeoutInSeconds() {
        return this.visibilityTimeoutInSeconds;
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.queueName);
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.queueName);
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    String getOrCreateQueue() {
        List<String> queueUrls = this.listQueues(this.queueName);
        if (queueUrls == null || queueUrls.isEmpty()) {
            CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(this.queueName);
            CreateQueueResult result = this.client.createQueue(createQueueRequest);
            return result.getQueueUrl();
        }
        return queueUrls.get(0);
    }

    private String getQueueARN() {
        GetQueueAttributesResult response = this.client.getQueueAttributes(this.queueURL, Collections.singletonList("QueueArn"));
        return (String)response.getAttributes().get("QueueArn");
    }

    private void addPolicy(List<String> accountsToAuthorize) {
        if (accountsToAuthorize == null || accountsToAuthorize.isEmpty()) {
            LOGGER.info("No additional security policies attached for the queue " + this.queueName);
            return;
        }
        LOGGER.info("Authorizing " + accountsToAuthorize + " to the queue " + this.queueName);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("Policy", this.getPolicy(accountsToAuthorize));
        SetQueueAttributesResult result = this.client.setQueueAttributes(this.queueURL, attributes);
        LOGGER.info("policy attachment result: " + result);
        LOGGER.info("policy attachment result: status=" + result.getSdkHttpMetadata().getHttpStatusCode());
    }

    private String getPolicy(List<String> accountIds) {
        Policy policy = new Policy("AuthorizedWorkerAccessPolicy");
        Statement stmt = new Statement(Statement.Effect.Allow);
        SQSActions action = SQSActions.SendMessage;
        stmt.getActions().add(action);
        stmt.setResources(new LinkedList());
        for (String accountId : accountIds) {
            Principal principal = new Principal(accountId);
            stmt.getPrincipals().add(principal);
        }
        stmt.getResources().add(new Resource(this.getQueueARN()));
        policy.getStatements().add(stmt);
        return policy.toJson();
    }

    private List<String> listQueues(String queueName) {
        ListQueuesRequest listQueuesRequest = new ListQueuesRequest().withQueueNamePrefix(queueName);
        ListQueuesResult resultList = this.client.listQueues(listQueuesRequest);
        return resultList.getQueueUrls().stream().filter(queueUrl -> queueUrl.contains(queueName)).collect(Collectors.toList());
    }

    private void publishMessages(List<Message> messages) {
        LOGGER.debug("Sending {} messages to the SQS queue: {}", (Object)messages.size(), (Object)this.queueName);
        SendMessageBatchRequest batch = new SendMessageBatchRequest(this.queueURL);
        messages.forEach(msg -> {
            SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(), msg.getPayload());
            batch.getEntries().add(sendr);
        });
        LOGGER.debug("sending {} messages in batch", (Object)batch.getEntries().size());
        SendMessageBatchResult result = this.client.sendMessageBatch(batch);
        LOGGER.debug("send result: {} for SQS queue: {}", (Object)result.getFailed().toString(), (Object)this.queueName);
    }

    List<Message> receiveMessages() {
        try {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(this.queueURL).withVisibilityTimeout(Integer.valueOf(this.visibilityTimeoutInSeconds)).withMaxNumberOfMessages(Integer.valueOf(this.batchSize));
            ReceiveMessageResult result = this.client.receiveMessage(receiveMessageRequest);
            List<Message> messages = result.getMessages().stream().map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle())).collect(Collectors.toList());
            Monitors.recordEventQueueMessagesProcessed((String)QUEUE_TYPE, (String)this.queueName, (int)messages.size());
            return messages;
        }
        catch (Exception e) {
            LOGGER.error("Exception while getting messages from SQS", (Throwable)e);
            Monitors.recordObservableQMessageReceivedErrors((String)QUEUE_TYPE);
            return new ArrayList<Message>();
        }
    }

    Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            Observable interval = Observable.interval((long)this.pollTimeInMS, (TimeUnit)TimeUnit.MILLISECONDS);
            interval.flatMap(x -> {
                if (!this.isRunning()) {
                    LOGGER.debug("Component stopped, skip listening for messages from SQS");
                    return Observable.from(Collections.emptyList());
                }
                List<Message> messages = this.receiveMessages();
                return Observable.from(messages);
            }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0));
        };
    }

    private List<String> delete(List<Message> messages) {
        if (messages == null || messages.isEmpty()) {
            return null;
        }
        DeleteMessageBatchRequest batch = new DeleteMessageBatchRequest().withQueueUrl(this.queueURL);
        List entries = batch.getEntries();
        messages.forEach(m -> entries.add(new DeleteMessageBatchRequestEntry().withId(m.getId()).withReceiptHandle(m.getReceipt())));
        DeleteMessageBatchResult result = this.client.deleteMessageBatch(batch);
        List<String> failures = result.getFailed().stream().map(BatchResultErrorEntry::getId).collect(Collectors.toList());
        LOGGER.debug("Failed to delete messages from queue: {}: {}", (Object)this.queueName, failures);
        return failures;
    }

    public static class Builder {
        private String queueName;
        private int visibilityTimeout = 30;
        private int batchSize = 5;
        private long pollTimeInMS = 100L;
        private AmazonSQS client;
        private List<String> accountsToAuthorize = new LinkedList<String>();
        private Scheduler scheduler;

        public Builder withQueueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        public Builder withVisibilityTimeout(int visibilityTimeout) {
            this.visibilityTimeout = visibilityTimeout;
            return this;
        }

        public Builder withBatchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder withClient(AmazonSQS client) {
            this.client = client;
            return this;
        }

        public Builder withPollTimeInMS(long pollTimeInMS) {
            this.pollTimeInMS = pollTimeInMS;
            return this;
        }

        public Builder withAccountsToAuthorize(List<String> accountsToAuthorize) {
            this.accountsToAuthorize = accountsToAuthorize;
            return this;
        }

        public Builder addAccountToAuthorize(String accountToAuthorize) {
            this.accountsToAuthorize.add(accountToAuthorize);
            return this;
        }

        public Builder withScheduler(Scheduler scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        public SQSObservableQueue build() {
            return new SQSObservableQueue(this.queueName, this.client, this.visibilityTimeout, this.batchSize, this.pollTimeInMS, this.accountsToAuthorize, this.scheduler);
        }
    }
}

