/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.integrations.aws.transports;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog.integrations.aws.transports.KinesisShardProcessorFactory;
import org.graylog.integrations.aws.transports.KinesisTransport;
import org.graylog2.plugin.InputFailureRecorder;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.NoOpWorkerStateChangeListener;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
import software.amazon.kinesis.lifecycle.NoOpTaskExecutionListener;
import software.amazon.kinesis.lifecycle.TaskExecutionListener;
import software.amazon.kinesis.lifecycle.TaskOutcome;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

public class KinesisConsumer
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
    private static final int GRACEFUL_SHUTDOWN_TIMEOUT = 20;
    private static final TimeUnit GRACEFUL_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.SECONDS;
    private final String kinesisStreamName;
    private final NodeId nodeId;
    private final KinesisTransport transport;
    private final Integer recordBatchSize;
    private final ObjectMapper objectMapper;
    private final AWSMessageType awsMessageType;
    private final Consumer<byte[]> handleMessageCallback;
    private final AWSRequest request;
    private final AWSClientBuilderUtil awsClientBuilderUtil;
    private final InputFailureRecorder inputFailureRecorder;
    private Scheduler kinesisScheduler;

    KinesisConsumer(NodeId nodeId, KinesisTransport transport, ObjectMapper objectMapper, Consumer<byte[]> handleMessageCallback, String kinesisStreamName, AWSMessageType awsMessageType, int recordBatchSize, AWSRequest request, AWSClientBuilderUtil awsClientBuilderUtil, InputFailureRecorder inputFailureRecorder) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)kinesisStreamName), (Object)"A Kinesis stream name is required.");
        Preconditions.checkNotNull((Object)((Object)awsMessageType), (Object)"A AWSMessageType is required.");
        this.nodeId = Objects.requireNonNull(nodeId, "nodeId");
        this.transport = transport;
        this.handleMessageCallback = handleMessageCallback;
        this.kinesisStreamName = Objects.requireNonNull(kinesisStreamName, "kinesisStream");
        this.objectMapper = objectMapper;
        this.awsMessageType = awsMessageType;
        this.recordBatchSize = recordBatchSize;
        this.request = request;
        this.awsClientBuilderUtil = awsClientBuilderUtil;
        this.inputFailureRecorder = inputFailureRecorder;
    }

    @Override
    public void run() {
        LOG.debug("Starting the Kinesis Consumer.");
        AwsCredentialsProvider credentialsProvider = this.awsClientBuilderUtil.createCredentialsProvider(this.request);
        Region region = Region.of((String)this.request.region());
        DynamoDbAsyncClientBuilder dynamoDbClientBuilder = DynamoDbAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder((AwsClientBuilder)dynamoDbClientBuilder, this.request.dynamodbEndpoint(), region, credentialsProvider);
        DynamoDbAsyncClient dynamoClient = (DynamoDbAsyncClient)dynamoDbClientBuilder.build();
        CloudWatchAsyncClientBuilder cloudwatchClientBuilder = CloudWatchAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder((AwsClientBuilder)cloudwatchClientBuilder, this.request.cloudwatchEndpoint(), region, credentialsProvider);
        CloudWatchAsyncClient cloudWatchClient = (CloudWatchAsyncClient)cloudwatchClientBuilder.build();
        KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder();
        this.awsClientBuilderUtil.initializeBuilder((AwsClientBuilder)kinesisAsyncClientBuilder, this.request.kinesisEndpoint(), region, credentialsProvider);
        KinesisAsyncClient kinesisAsyncClient = KinesisClientUtil.createKinesisAsyncClient((KinesisAsyncClientBuilder)kinesisAsyncClientBuilder);
        String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", this.nodeId.anonymize());
        LOG.debug("Using workerId [{}].", (Object)workerId);
        String applicationName = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", this.kinesisStreamName);
        LOG.debug("Using Kinesis applicationName [{}].", (Object)applicationName);
        KinesisShardProcessorFactory kinesisShardProcessorFactory = new KinesisShardProcessorFactory(this.objectMapper, this.transport, this.handleMessageCallback, this.kinesisStreamName, this.awsMessageType);
        ConfigsBuilder configsBuilder = new ConfigsBuilder(this.kinesisStreamName, applicationName, kinesisAsyncClient, dynamoClient, cloudWatchClient, workerId, (ShardRecordProcessorFactory)kinesisShardProcessorFactory);
        PollingConfig pollingConfig = new PollingConfig(this.kinesisStreamName, kinesisAsyncClient);
        if (this.recordBatchSize != null) {
            LOG.debug("Using explicit batch size [{}]", (Object)this.recordBatchSize);
            pollingConfig.maxRecords(this.recordBatchSize.intValue());
        }
        NoOpWorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener(){

            public void onAllInitializationAttemptsFailed(Throwable e) {
                KinesisConsumer.this.inputFailureRecorder.setFailing(KinesisConsumer.class, String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", KinesisConsumer.this.kinesisStreamName), e);
            }
        };
        NoOpTaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener(){

            public void afterTaskExecution(TaskExecutionListenerInput input) {
                if (TaskOutcome.FAILURE.equals((Object)input.taskOutcome())) {
                    KinesisConsumer.this.inputFailureRecorder.setFailing(KinesisConsumer.class, String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", KinesisConsumer.this.kinesisStreamName));
                } else if (TaskOutcome.SUCCESSFUL.equals((Object)input.taskOutcome()) && TaskType.PROCESS.equals((Object)input.taskType())) {
                    KinesisConsumer.this.inputFailureRecorder.setRunning();
                }
            }
        };
        this.kinesisScheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig().workerStateChangeListener((WorkerStateChangeListener)workerStateChangeListener), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig().taskExecutionListener((TaskExecutionListener)taskExecutionListener), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig((RetrievalSpecificConfig)pollingConfig));
        LOG.debug("Starting Kinesis scheduler.");
        this.kinesisScheduler.run();
        LOG.debug("After Kinesis scheduler stopped.");
    }

    public void stop() {
        if (this.kinesisScheduler != null) {
            CompletableFuture gracefulShutdownFuture = this.kinesisScheduler.startGracefulShutdown();
            LOG.info("Waiting up to 20 seconds for Kinesis Consumer shutdown to complete.");
            try {
                gracefulShutdownFuture.get(20L, GRACEFUL_SHUTDOWN_TIMEOUT_UNIT);
            }
            catch (InterruptedException e) {
                LOG.info("Interrupted while waiting for graceful shutdown. Continuing.");
            }
            catch (ExecutionException e) {
                LOG.error("Exception while executing graceful shutdown.", (Throwable)e);
            }
            catch (TimeoutException e) {
                LOG.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
                this.kinesisScheduler.shutdown();
            }
        }
    }
}

