/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.integrations.aws.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Preconditions;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.commons.collections.CollectionUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSLogMessage;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.cloudwatch.CloudWatchLogEvent;
import org.graylog.integrations.aws.cloudwatch.CloudWatchLogSubscriptionData;
import org.graylog.integrations.aws.cloudwatch.KinesisLogEntry;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog.integrations.aws.resources.requests.CreateRolePermissionRequest;
import org.graylog.integrations.aws.resources.requests.KinesisHealthCheckRequest;
import org.graylog.integrations.aws.resources.requests.KinesisNewStreamRequest;
import org.graylog.integrations.aws.resources.responses.CreateRolePermissionResponse;
import org.graylog.integrations.aws.resources.responses.KinesisHealthCheckResponse;
import org.graylog.integrations.aws.resources.responses.KinesisNewStreamResponse;
import org.graylog.integrations.aws.resources.responses.StreamsResponse;
import org.graylog.integrations.aws.service.AWSService;
import org.graylog.integrations.aws.transports.KinesisPayloadDecoder;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.iam.IamClient;
import software.amazon.awssdk.services.iam.IamClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

public class KinesisService {
    private static final Logger LOG = LoggerFactory.getLogger(AWSService.class);
    private static final int EIGHT_BITS = 8;
    private static final int KINESIS_LIST_STREAMS_MAX_ATTEMPTS = 1000;
    private static final int KINESIS_LIST_STREAMS_LIMIT = 400;
    private static final int RECORDS_SAMPLE_SIZE = 10;
    private static final int SHARD_COUNT = 1;
    private static final String ROLE_NAME_FORMAT = "graylog-cloudwatch-role-%s";
    private static final String ROLE_POLICY_NAME_FORMAT = "graylog-cloudwatch-role-policy-%s";
    private static final String UNIQUE_ROLE_DATE_FORMAT = "yyyy-MM-dd-HH-mm-ss";
    private static final String CONTROL_MESSAGE_TOKEN = "CWL CONTROL MESSAGE";
    private final IamClientBuilder iamClientBuilder;
    private final KinesisClientBuilder kinesisClientBuilder;
    private final ObjectMapper objectMapper;
    private final Map<String, Codec.Factory<? extends Codec>> availableCodecs;
    private final AWSClientBuilderUtil awsClientBuilderUtil;

    @Inject
    public KinesisService(IamClientBuilder iamClientBuilder, KinesisClientBuilder kinesisClientBuilder, ObjectMapper objectMapper, Map<String, Codec.Factory<? extends Codec>> availableCodecs, AWSClientBuilderUtil awsClientBuilderUtil) {
        this.iamClientBuilder = iamClientBuilder;
        this.kinesisClientBuilder = kinesisClientBuilder;
        this.objectMapper = objectMapper;
        this.availableCodecs = availableCodecs;
        this.awsClientBuilderUtil = awsClientBuilderUtil;
    }

    public KinesisHealthCheckResponse healthCheck(KinesisHealthCheckRequest request) throws ExecutionException, IOException {
        LOG.debug("Executing healthCheck");
        LOG.debug("Requesting a list of streams to find out if the indicated stream exists.");
        StreamsResponse kinesisStreamNames = this.getKinesisStreamNames(request);
        boolean streamExists = kinesisStreamNames.streams().stream().anyMatch(streamName -> streamName.equals(request.streamName()));
        if (!streamExists) {
            throw new BadRequestException(String.format(Locale.ROOT, "The requested stream [%s] was not found.", request.streamName()));
        }
        LOG.debug("The stream [{}] exists", (Object)request.streamName());
        KinesisClient kinesisClient = this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, (AWSRequest)request);
        List<Record> records = this.retrieveRecords(request.streamName(), kinesisClient);
        if (records.size() == 0) {
            throw new BadRequestException(String.format(Locale.ROOT, "The Kinesis stream [%s] does not contain any messages.", request.streamName()));
        }
        Record record = this.selectRandomRecord(records);
        byte[] payloadBytes = record.data().asByteArray();
        boolean compressed = KinesisService.isCompressed(payloadBytes);
        if (compressed) {
            return this.handleCompressedMessages(request, payloadBytes);
        }
        DateTime timestamp = new DateTime(record.approximateArrivalTimestamp().toEpochMilli(), DateTimeZone.UTC);
        return this.detectAndParseMessage(new String(payloadBytes, StandardCharsets.UTF_8), timestamp, request.streamName(), "", "", compressed);
    }

    public StreamsResponse getKinesisStreamNames(AWSRequest request) throws ExecutionException {
        LOG.debug("List Kinesis streams for region [{}]", (Object)request.region());
        KinesisClient kinesisClient = this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, request);
        ListStreamsRequest streamsRequest = (ListStreamsRequest)ListStreamsRequest.builder().limit(Integer.valueOf(400)).build();
        ListStreamsResponse listStreamsResponse = kinesisClient.listStreams(streamsRequest);
        ArrayList<String> streamNames = new ArrayList<String>(listStreamsResponse.streamNames());
        Retryer retryer = RetryerBuilder.newBuilder().retryIfResult(b -> Objects.equals(b, Boolean.TRUE)).retryIfExceptionOfType(LimitExceededException.class).withStopStrategy(StopStrategies.stopAfterAttempt((int)1000)).build();
        if (listStreamsResponse.hasMoreStreams().booleanValue()) {
            try {
                retryer.call(() -> {
                    LOG.debug("Requesting streams...");
                    String lastStreamName = (String)streamNames.get(streamNames.size() - 1);
                    ListStreamsRequest moreStreamsRequest = (ListStreamsRequest)ListStreamsRequest.builder().exclusiveStartStreamName(lastStreamName).limit(Integer.valueOf(400)).build();
                    ListStreamsResponse moreSteamsResponse = kinesisClient.listStreams(moreStreamsRequest);
                    streamNames.addAll(moreSteamsResponse.streamNames());
                    return moreSteamsResponse.hasMoreStreams();
                });
            }
            catch (RetryException e) {
                LOG.error("Failed to get all stream names after {} attempts. Proceeding to return currently obtained streams.", (Object)1000);
            }
        }
        LOG.debug("Kinesis streams queried: [{}]", streamNames);
        if (streamNames.isEmpty()) {
            throw new BadRequestException(String.format(Locale.ROOT, "No Kinesis streams were found in the [%s] region.", request.region()));
        }
        return StreamsResponse.create(streamNames, streamNames.size());
    }

    private KinesisHealthCheckResponse handleCompressedMessages(KinesisHealthCheckRequest request, byte[] payloadBytes) throws IOException {
        LOG.debug("The supplied payload is GZip compressed. Proceeding to decompress.");
        CloudWatchLogSubscriptionData data = KinesisPayloadDecoder.decompressCloudWatchMessages(payloadBytes, this.objectMapper);
        Optional logEntryOptional = data.logEvents().stream().findAny();
        if (logEntryOptional.isEmpty()) {
            throw new BadRequestException("The CloudWatch payload did not contain any messages. This should not happen. See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html");
        }
        CloudWatchLogEvent logEntry = (CloudWatchLogEvent)logEntryOptional.get();
        DateTime timestamp = new DateTime(logEntry.timestamp(), DateTimeZone.UTC);
        return this.detectAndParseMessage(logEntry.message(), timestamp, request.streamName(), data.logGroup(), data.logStream(), true);
    }

    List<Record> retrieveRecords(String kinesisStream, KinesisClient kinesisClient) {
        LOG.debug("About to retrieve logs records from Kinesis.");
        ListShardsRequest listShardsRequest = (ListShardsRequest)ListShardsRequest.builder().streamName(kinesisStream).build();
        ListShardsResponse listShardsResponse = kinesisClient.listShards(listShardsRequest);
        ArrayList<Record> recordsList = new ArrayList<Record>();
        for (Shard shard : listShardsResponse.shards()) {
            String shardId = shard.shardId();
            GetShardIteratorRequest getShardIteratorRequest = (GetShardIteratorRequest)GetShardIteratorRequest.builder().shardId(shardId).streamName(kinesisStream).shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();
            String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest).shardIterator();
            boolean stayOnCurrentShard = true;
            LOG.debug("Retrieved shard id: [{}] with shard iterator: [{}]", (Object)shardId, (Object)shardIterator);
            while (stayOnCurrentShard) {
                LOG.debug("Getting more records");
                GetRecordsRequest getRecordsRequest = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).build();
                GetRecordsResponse getRecordsResponse = kinesisClient.getRecords(getRecordsRequest);
                shardIterator = getRecordsResponse.nextShardIterator();
                for (Record record : getRecordsResponse.records()) {
                    if (this.isControlMessage(record)) continue;
                    recordsList.add(record);
                    if (recordsList.size() != 10) continue;
                    LOG.debug("Returning the list of records now that sample size [{}] has been met.", (Object)10);
                    return recordsList;
                }
                if (getRecordsResponse.millisBehindLatest() != 0L) continue;
                LOG.debug("Found the end of the shard. No more records returned from the shard.");
                stayOnCurrentShard = false;
            }
        }
        LOG.debug("Returning the list with [{}] records.", (Object)recordsList.size());
        return recordsList;
    }

    private boolean isControlMessage(Record record) {
        byte[] recordData = record.data().asByteArray();
        if (KinesisService.isCompressed(recordData)) {
            try {
                return Tools.decompressGzip(recordData).contains(CONTROL_MESSAGE_TOKEN);
            }
            catch (IOException e) {
                throw new BadRequestException("Failed to decode message from CloudWatch and check if it's a control message.");
            }
        }
        return false;
    }

    private KinesisHealthCheckResponse detectAndParseMessage(String logMessage, DateTime timestamp, String kinesisStreamName, String logGroupName, String logStreamName, boolean compressed) {
        byte[] payload;
        LOG.debug("Attempting to detect the type of log message. message [{}] stream [{}] log group [{}].", new Object[]{logMessage, kinesisStreamName, logGroupName});
        AWSLogMessage awsLogMessage = new AWSLogMessage(logMessage);
        AWSMessageType awsMessageType = awsLogMessage.detectLogMessageType(compressed);
        LOG.debug("The message is type [{}]", (Object)awsMessageType);
        String responseMessage = String.format(Locale.ROOT, "Success. The message is a %s message.", awsMessageType.getLabel());
        KinesisLogEntry logEvent = KinesisLogEntry.create(kinesisStreamName, logGroupName, logStreamName, timestamp, logMessage);
        Codec.Factory<? extends Codec> codecFactory = this.availableCodecs.get(awsMessageType.getCodecName());
        if (codecFactory == null) {
            throw new BadRequestException(String.format(Locale.ROOT, "A codec with name [%s] could not be found.", awsMessageType.getCodecName()));
        }
        Codec codec = codecFactory.create(Configuration.EMPTY_CONFIGURATION);
        try {
            payload = this.objectMapper.writeValueAsBytes((Object)logEvent);
        }
        catch (JsonProcessingException e) {
            throw new BadRequestException("Encoding the message to bytes failed.", (Throwable)e);
        }
        Message fullyParsedMessage = codec.decode(new RawMessage(payload));
        if (fullyParsedMessage == null) {
            throw new BadRequestException(String.format(Locale.ROOT, "Message decoding failed. More information might be available by enabling Debug logging. message [%s]", logMessage));
        }
        LOG.debug("Successfully parsed message type [{}] with codec [{}].", (Object)awsMessageType, (Object)awsMessageType.getCodecName());
        return KinesisHealthCheckResponse.create(awsMessageType, responseMessage, fullyParsedMessage.getFields());
    }

    Record selectRandomRecord(List<Record> recordsList) {
        Preconditions.checkArgument((boolean)CollectionUtils.isNotEmpty(recordsList), (Object)"Records list can not be empty.");
        LOG.debug("Selecting a random Record from the sample list.");
        return recordsList.get(new Random().nextInt(recordsList.size()));
    }

    public static boolean isCompressed(byte[] bytes) {
        if (bytes == null || bytes.length < 2) {
            return false;
        }
        boolean firstByteIsMagicNumber = bytes[0] == 31;
        boolean secondByteIsMagicNumber = bytes[1] == -117;
        return firstByteIsMagicNumber && secondByteIsMagicNumber;
    }

    public KinesisNewStreamResponse createNewKinesisStream(KinesisNewStreamRequest request) {
        LOG.debug("Creating Kinesis client with the provided credentials.");
        KinesisClient kinesisClient = this.awsClientBuilderUtil.buildClient(this.kinesisClientBuilder, (AWSRequest)request);
        LOG.debug("Creating new Kinesis stream request [{}].", (Object)request.streamName());
        CreateStreamRequest createStreamRequest = (CreateStreamRequest)CreateStreamRequest.builder().streamName(request.streamName()).shardCount(Integer.valueOf(1)).build();
        LOG.debug("Sending request to create new Kinesis stream [{}] with [{}] shards.", (Object)request.streamName(), (Object)1);
        try {
            StreamDescription streamDescription;
            kinesisClient.createStream(createStreamRequest);
            int seconds = 0;
            do {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.error("Request interrupted while waiting for shard to become available.");
                    return null;
                }
                streamDescription = kinesisClient.describeStream((DescribeStreamRequest)DescribeStreamRequest.builder().streamName(request.streamName()).build()).streamDescription();
                if (seconds > 300) {
                    String responseMessage = String.format(Locale.ROOT, "Fail. Stream [%s] has failed to become active within 60 seconds.", request.streamName());
                    throw new BadRequestException(responseMessage);
                }
                ++seconds;
            } while (streamDescription.streamStatus() != StreamStatus.ACTIVE);
            String streamArn = streamDescription.streamARN();
            String responseMessage = String.format(Locale.ROOT, "Success. The new stream [%s/%s] was created with [%d] shard.", request.streamName(), streamArn, 1);
            return KinesisNewStreamResponse.create(createStreamRequest.streamName(), streamArn, responseMessage);
        }
        catch (Exception e) {
            String specificError = ExceptionUtils.formatMessageCause(e);
            String responseMessage = String.format(Locale.ROOT, "Attempt to create [%s] new Kinesis stream with [%d] shards failed due to the following exception: [%s]", request.streamName(), 1, specificError);
            LOG.error(responseMessage, (Throwable)e);
            throw new BadRequestException(responseMessage, (Throwable)e);
        }
    }

    public CreateRolePermissionResponse autoKinesisPermissions(CreateRolePermissionRequest request) {
        String roleName = String.format(Locale.ROOT, ROLE_NAME_FORMAT, DateTime.now((DateTimeZone)DateTimeZone.UTC).toString(UNIQUE_ROLE_DATE_FORMAT));
        try {
            IamClient iamClient = this.awsClientBuilderUtil.buildClient(this.iamClientBuilder, (AWSRequest)request);
            String createRoleResponse = KinesisService.createRoleForKinesisAutoSetup(iamClient, request.region(), roleName);
            LOG.debug(createRoleResponse);
            KinesisService.setPermissionsForKinesisAutoSetupRole(iamClient, roleName, request.streamArn());
            String roleArn = KinesisService.getRolePermissionsArn(iamClient, roleName);
            String explanation = String.format(Locale.ROOT, "Success! The role [%s/%s] has been created.", roleName, roleArn);
            return CreateRolePermissionResponse.create(explanation, roleArn, roleName);
        }
        catch (Exception e) {
            String specificError = ExceptionUtils.formatMessageCause(e);
            String responseMessage = String.format(Locale.ROOT, "Unable to automatically set up Kinesis role [%s] due to the following error [%s]", roleName, specificError);
            throw new BadRequestException(responseMessage);
        }
    }

    private static void setPermissionsForKinesisAutoSetupRole(IamClient iam, String roleName, String streamArn) {
        String rolePolicy = "{\n  \"Statement\": [\n    {\n      \"Effect\": \"Allow\",\n      \"Action\": \"kinesis:PutRecord\",\n      \"Resource\": \"" + streamArn + "\"\n    }\n  ]\n}";
        String rolePolicyName = String.format(Locale.ROOT, ROLE_POLICY_NAME_FORMAT, DateTime.now((DateTimeZone)DateTimeZone.UTC).toString(UNIQUE_ROLE_DATE_FORMAT));
        LOG.debug("Attaching [{}] policy to [{}] role", (Object)rolePolicyName, (Object)roleName);
        try {
            iam.putRolePolicy(r -> r.roleName(roleName).policyName(rolePolicyName).policyDocument(rolePolicy));
            LOG.debug("Success! The role policy [{}] was assigned.", (Object)rolePolicyName);
        }
        catch (Exception e) {
            String specificError = ExceptionUtils.formatMessageCause(e);
            String responseMessage = String.format(Locale.ROOT, "Unable to create role [%s] due to the following error [%s]", roleName, specificError);
            throw new BadRequestException(responseMessage);
        }
    }

    private static String createRoleForKinesisAutoSetup(IamClient iam, String region, String roleName) {
        LOG.debug("Create Kinesis Auto Setup Role [{}] to region [{}]", (Object)roleName, (Object)region);
        String assumeRolePolicy = "{\n  \"Statement\": [\n    {\n      \"Effect\": \"Allow\",\n      \"Principal\": { \"Service\": \"logs." + region + ".amazonaws.com\" },\n      \"Action\": \"sts:AssumeRole\"\n    }\n  ]\n}";
        LOG.debug("Role [{}] was created.", (Object)roleName);
        try {
            iam.createRole(r -> r.roleName(roleName).assumeRolePolicyDocument(assumeRolePolicy));
            return String.format(Locale.ROOT, "Success! The role [%s] was created.", roleName);
        }
        catch (Exception e) {
            String specificError = ExceptionUtils.formatMessageCause(e);
            String responseMessage = String.format(Locale.ROOT, "The role [%s] was not created due to the following reason [%s]", roleName, specificError);
            throw new BadRequestException(responseMessage);
        }
    }

    private static String getRolePermissionsArn(IamClient iamClient, String roleName) {
        LOG.debug("Acquiring the role ARN associated to the role [{}]", (Object)roleName);
        return iamClient.getRole(r -> r.roleName(roleName)).role().arn();
    }
}

