/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.aws.inputs.cloudtrail;

import com.amazonaws.regions.Region;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import org.graylog.aws.auth.AWSAuthProvider;
import org.graylog.aws.inputs.cloudtrail.json.CloudTrailRecord;
import org.graylog.aws.inputs.cloudtrail.messages.TreeReader;
import org.graylog.aws.inputs.cloudtrail.notifications.CloudtrailSNSNotification;
import org.graylog.aws.inputs.cloudtrail.notifications.CloudtrailSQSClient;
import org.graylog.aws.s3.S3Reader;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudTrailSubscriber
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(CloudTrailSubscriber.class);
    public static final int SLEEP_INTERVAL_SECS = 5;
    private volatile boolean stopped = false;
    private volatile boolean paused = false;
    private volatile CountDownLatch pausedLatch = new CountDownLatch(0);
    private final MessageInput sourceInput;
    private final Region sqsRegion;
    private final Region s3Region;
    private final String queueName;
    private final AWSAuthProvider authProvider;
    private final HttpUrl proxyUrl;
    private final ObjectMapper objectMapper;
    private final InputFailureRecorder inputFailureRecorder;

    public CloudTrailSubscriber(Region sqsRegion, Region s3Region, String queueName, MessageInput sourceInput, AWSAuthProvider authProvider, HttpUrl proxyUrl, ObjectMapper objectMapper, InputFailureRecorder inputFailureRecorder) {
        this.sqsRegion = sqsRegion;
        this.s3Region = s3Region;
        this.queueName = queueName;
        this.authProvider = authProvider;
        this.sourceInput = sourceInput;
        this.proxyUrl = proxyUrl;
        this.objectMapper = objectMapper;
        this.inputFailureRecorder = inputFailureRecorder;
    }

    public void pause() {
        this.paused = true;
        this.pausedLatch = new CountDownLatch(1);
    }

    public void unpause() {
        this.paused = false;
        this.pausedLatch.countDown();
    }

    @Override
    public void run() {
        LOG.debug("Starting CloudTrailSubscriber");
        CloudtrailSQSClient subscriber = new CloudtrailSQSClient(this.sqsRegion, this.queueName, this.authProvider, this.proxyUrl, this.objectMapper);
        TreeReader reader = new TreeReader(this.objectMapper);
        S3Reader s3Reader = new S3Reader(this.s3Region, this.proxyUrl, this.authProvider);
        while (!this.stopped) {
            while (!this.stopped) {
                List<CloudtrailSNSNotification> notifications;
                if (this.paused) {
                    LOG.debug("Processing paused");
                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)this.pausedLatch);
                }
                if (this.stopped) break;
                try {
                    notifications = subscriber.getNotifications();
                }
                catch (Exception e) {
                    this.inputFailureRecorder.setFailing(this.getClass(), "Could not read messages from SQS. This is most likely a misconfiguration of the plugin. Going into sleep loop and retrying.", e);
                    break;
                }
                LOG.debug("Subscriber returned [{}] notifications.", (Object)notifications.size());
                if (notifications.size() == 0) {
                    LOG.debug("No more messages to read from SQS. Going into sleep loop.");
                    break;
                }
                LOG.debug("Proceeding to read message content from S3.");
                for (CloudtrailSNSNotification n : notifications) {
                    try {
                        LOG.debug("Checking for CloudTrail notifications in SQS.");
                        List<CloudTrailRecord> records = reader.read(s3Reader.readCompressed(n.getS3Bucket(), n.getS3ObjectKey()));
                        LOG.debug("[{}] records read from S3.", (Object)records.size());
                        for (CloudTrailRecord record : records) {
                            LOG.debug("Processing message content.");
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Processing cloud trail record: {}", (Object)this.objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString((Object)record));
                            }
                            this.sourceInput.processRawMessage(new RawMessage(this.objectMapper.writeValueAsBytes((Object)record)));
                        }
                        subscriber.deleteNotification(n);
                        this.inputFailureRecorder.setRunning();
                    }
                    catch (Exception e) {
                        this.inputFailureRecorder.setFailing(this.getClass(), StringUtils.f("Could not read CloudTrail log file for <%s>. Skipping.", n.getS3Bucket()), e);
                    }
                }
            }
            if (this.stopped) continue;
            LOG.debug("Waiting {} seconds until next CloudTrail SQS check.", (Object)5);
            Uninterruptibles.sleepUninterruptibly((long)5L, (TimeUnit)TimeUnit.SECONDS);
        }
    }

    public void terminate() {
        this.stopped = true;
        this.paused = false;
        this.pausedLatch.countDown();
    }
}

