/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.integrations.aws.transports;

import com.codahale.metrics.MetricSet;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import java.net.URL;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.resources.requests.AWSRequestImpl;
import org.graylog.integrations.aws.service.AWSService;
import org.graylog.integrations.aws.transports.KinesisConsumer;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport2;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.encryption.EncryptedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

public class KinesisTransport
extends ThrottleableTransport2 {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisTransport.class);
    public static final String NAME = "aws-kinesis-transport";
    private static final String CK_AWS_REGION = "aws_region";
    private static final String CK_ACCESS_KEY = "aws_access_key";
    private static final String CK_SECRET_KEY = "aws_secret_key";
    public static final String CK_KINESIS_STREAM_NAME = "kinesis_stream_name";
    public static final String CK_KINESIS_RECORD_BATCH_SIZE = "kinesis_record_batch_size";
    public static final int DEFAULT_BATCH_SIZE = 10000;
    private final Configuration configuration;
    private final NodeId nodeId;
    private final LocalMetricRegistry localRegistry;
    private final ObjectMapper objectMapper;
    private final AWSClientBuilderUtil awsClientBuilderUtil;
    private final ExecutorService executor;
    private KinesisConsumer kinesisConsumer;

    @Inject
    public KinesisTransport(@Assisted Configuration configuration, EventBus serverEventBus, NodeId nodeId, LocalMetricRegistry localRegistry, ObjectMapper objectMapper, AWSClientBuilderUtil awsClientBuilderUtil) {
        super(serverEventBus, configuration);
        this.configuration = configuration;
        this.nodeId = nodeId;
        this.localRegistry = localRegistry;
        this.objectMapper = objectMapper;
        this.awsClientBuilderUtil = awsClientBuilderUtil;
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aws-kinesis-reader-%d").setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in AWS Kinesis reader.", e)).build());
    }

    @Override
    public void handleChangedThrottledState(boolean isThrottled) {
        if (!isThrottled) {
            LOG.info("Kinesis consumer unthrottled");
        } else {
            LOG.info("Kinesis consumer throttled");
        }
    }

    @Override
    public void doLaunch(MessageInput input, InputFailureRecorder inputFailureRecorder) throws MisfireException {
        Region region = Region.of((String)Objects.requireNonNull(this.configuration.getString(CK_AWS_REGION)));
        String key = this.configuration.getString(CK_ACCESS_KEY);
        EncryptedValue secret = this.configuration.getEncryptedValue(CK_SECRET_KEY);
        String assumeRoleArn = this.configuration.getString("aws_assume_role_arn");
        String dynamodbEndpoint = this.configuration.getString("dynamodb_endpoint");
        String cloudwatchEndpoint = this.configuration.getString("cloudwatch_endpoint");
        String iamEndpoint = this.configuration.getString("iam_endpoint");
        String kinesisEndpoint = this.configuration.getString("kinesis_endpoint");
        KinesisTransport.validateEndpoint(dynamodbEndpoint, "DynamoDB");
        KinesisTransport.validateEndpoint(cloudwatchEndpoint, "CloudWatch");
        KinesisTransport.validateEndpoint(iamEndpoint, "IAM");
        KinesisTransport.validateEndpoint(iamEndpoint, "Kinesis");
        AWSRequestImpl awsRequest = ((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)((AWSRequestImpl.Builder)AWSRequestImpl.builder().region(region.id())).awsAccessKeyId(key)).awsSecretAccessKey(secret)).assumeRoleArn(assumeRoleArn)).cloudwatchEndpoint(cloudwatchEndpoint)).dynamodbEndpoint(dynamodbEndpoint)).iamEndpoint(iamEndpoint)).kinesisEndpoint(kinesisEndpoint)).build();
        int batchSize = this.configuration.getInt(CK_KINESIS_RECORD_BATCH_SIZE, 10000);
        String streamName = this.configuration.getString(CK_KINESIS_STREAM_NAME);
        AWSMessageType awsMessageType = AWSMessageType.valueOf(this.configuration.getString("aws_message_type"));
        this.kinesisConsumer = new KinesisConsumer(this.nodeId, this, this.objectMapper, this.kinesisCallback(input), streamName, awsMessageType, batchSize, awsRequest, this.awsClientBuilderUtil, inputFailureRecorder);
        LOG.debug("Starting Kinesis reader thread for input {}", (Object)input.toIdentifier());
        this.executor.submit(this.kinesisConsumer);
    }

    static void validateEndpoint(String endpoint, String endpointName) throws MisfireException {
        if (StringUtils.isNotEmpty((CharSequence)endpoint)) {
            try {
                new URL(endpoint).toURI();
            }
            catch (Exception e) {
                throw new MisfireException(String.format(Locale.ROOT, "The specified [%s] Override Endpoint [%s] is invalid.", endpointName, endpoint), e);
            }
        }
    }

    private Consumer<byte[]> kinesisCallback(MessageInput input) {
        return data -> input.processRawMessage(new RawMessage((byte[])data));
    }

    @Override
    public void doStop() {
        if (this.kinesisConsumer != null) {
            this.kinesisConsumer.stop();
        }
    }

    @Override
    public void setMessageAggregator(CodecAggregator aggregator) {
    }

    @Override
    public MetricSet getMetricSet() {
        return this.localRegistry;
    }

    @ConfigClass
    public static class Config
    extends ThrottleableTransport.Config {
        @Override
        public ConfigurationRequest getRequestedConfiguration() {
            ConfigurationRequest r = super.getRequestedConfiguration();
            r.addField(new DropdownField(KinesisTransport.CK_AWS_REGION, "AWS Region", Region.US_EAST_1.id(), AWSService.buildRegionChoices(), "The AWS region the Kinesis stream is running in.", ConfigurationField.Optional.NOT_OPTIONAL));
            r.addField(new TextField(KinesisTransport.CK_ACCESS_KEY, "AWS access key", "", "Access key of an AWS user with sufficient permissions. (See documentation)", ConfigurationField.Optional.OPTIONAL));
            r.addField(new TextField(KinesisTransport.CK_SECRET_KEY, "AWS secret key", "", "Secret key of an AWS user with sufficient permissions. (See documentation)", ConfigurationField.Optional.OPTIONAL, TextField.Attribute.IS_PASSWORD));
            r.addField(new TextField(KinesisTransport.CK_KINESIS_STREAM_NAME, "Kinesis Stream name", "", "The name of the Kinesis stream that receives your messages. See README for instructions on how to connect messages to a Kinesis Stream.", ConfigurationField.Optional.NOT_OPTIONAL));
            r.addField(new NumberField(KinesisTransport.CK_KINESIS_RECORD_BATCH_SIZE, "Kinesis Record batch size.", 10000, "The number of Kinesis records to fetch at a time. Each record may be up to 1MB in size. The AWS default is 10,000. Enter a smaller value to process smaller chunks at a time.", ConfigurationField.Optional.OPTIONAL, NumberField.Attribute.ONLY_POSITIVE));
            return r;
        }
    }

    @FactoryClass
    public static interface Factory
    extends Transport.Factory<KinesisTransport> {
        @Override
        public KinesisTransport create(Configuration var1);

        @Override
        public Config getConfig();
    }
}

