/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connector.amazon.kinesis.internal.source.consumer;

import com.mulesoft.connector.amazon.kinesis.api.AbsolutePosition;
import com.mulesoft.connector.amazon.kinesis.api.RecordAttributes;
import com.mulesoft.connector.amazon.kinesis.api.ShardConfig;
import com.mulesoft.connector.amazon.kinesis.api.ShardInitialPosition;
import com.mulesoft.connector.amazon.kinesis.api.ShardPrioritization;
import com.mulesoft.connector.amazon.kinesis.api.StreamInitialPosition;
import com.mulesoft.connector.amazon.kinesis.internal.connection.provider.parameter.CommonListenerParameters;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.DynamoDbResourceInUseException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.DynamoDbThroughputExceededException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.IllegalStateException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidShardException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidStreamException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.KeyManagementServiceException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.KinesisRuntimeException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.LimitExceededException;
import com.mulesoft.connector.amazon.kinesis.internal.error.exception.UnauthorizedException;
import com.mulesoft.connector.amazon.kinesis.internal.source.consumer.Consumer;
import com.mulesoft.connector.amazon.kinesis.internal.source.filter.ShardFilter;
import com.mulesoft.connector.amazon.kinesis.internal.source.filter.ShardFilterWithParentPrioritization;
import com.mulesoft.connector.amazon.kinesis.internal.source.processor.KinesisRecordProcessor;
import java.io.InputStream;
import java.math.BigInteger;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.KmsAccessDeniedException;
import software.amazon.awssdk.services.kinesis.model.KmsDisabledException;
import software.amazon.awssdk.services.kinesis.model.KmsInvalidStateException;
import software.amazon.awssdk.services.kinesis.model.KmsNotFoundException;
import software.amazon.awssdk.services.kinesis.model.KmsOptInRequiredException;
import software.amazon.awssdk.services.kinesis.model.KmsThrottlingException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.NoOpShardPrioritization;
import software.amazon.kinesis.leases.ParentsFirstShardPrioritization;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStats;

public class ConsumerImpl
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
    private static final int MAX_DYNAMODB_BATCH_SIZE = 25;
    private static final String LEASE_KEY = "leaseKey";
    private final CommonListenerParameters commonParams;
    private final StreamInitialPosition streamInitialPosition;
    private final KinesisAsyncClient kinesisAsyncClient;
    private final KinesisClient kinesisSyncClient;
    private final DynamoDbAsyncClient dynamoClient;
    private final CloudWatchAsyncClient cloudWatchClient;
    private final int maxBatchSize;
    private final String streamName;
    private final String workerId;
    private final String applicationName;
    private final Map<String, KinesisRecordProcessor> processors;
    private ScheduledFuture<?> scheduledFuture;
    private Scheduler kclScheduler;
    private final String leaseTableName;
    private final String coordinatorStateTableName;
    private final String workerMetricStatsTableName;

    public ConsumerImpl(String consumerId, CommonListenerParameters commonParams, StreamInitialPosition streamInitialPosition, KinesisAsyncClient kinesisAsyncClient, KinesisClient kinesisSyncClient, DynamoDbAsyncClient dynamoClient, CloudWatchAsyncClient cloudWatchClient) {
        this.workerId = consumerId + UUID.randomUUID().toString();
        this.applicationName = commonParams.getApplicationName();
        this.streamName = commonParams.getStreamName();
        this.commonParams = commonParams;
        this.streamInitialPosition = streamInitialPosition;
        this.kinesisAsyncClient = kinesisAsyncClient;
        this.kinesisSyncClient = kinesisSyncClient;
        this.dynamoClient = dynamoClient;
        this.cloudWatchClient = cloudWatchClient;
        this.maxBatchSize = commonParams.getMaxBatchSize();
        this.processors = new ConcurrentHashMap<String, KinesisRecordProcessor>();
        this.leaseTableName = this.applicationName;
        this.coordinatorStateTableName = this.applicationName + "-CoordinatorState";
        this.workerMetricStatsTableName = this.applicationName + "-WorkerMetricStats";
        this.prepareInternalDynamoDBTable(commonParams, streamInitialPosition);
    }

    @Override
    public void checkpoint() {
        this.processors.values().forEach(KinesisRecordProcessor::checkpoint);
    }

    @Override
    public void checkpoint(String shardId) {
        this.getKinesisRecordProcessor(shardId).checkpoint();
    }

    @Override
    public void checkpoint(String shardId, String sequenceNumber, Long subsequenceNumber) {
        this.getKinesisRecordProcessor(shardId).checkpoint(sequenceNumber, subsequenceNumber);
    }

    @Override
    public void checkpoint(String shardId, String sequenceNumber) {
        this.getKinesisRecordProcessor(shardId).checkpoint(sequenceNumber, null);
    }

    private KinesisRecordProcessor getKinesisRecordProcessor(String shardId) {
        return Optional.ofNullable(this.processors.get(shardId)).orElseThrow(() -> new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(String.format("No matching record processor found for shard ID '%s'", shardId)));
    }

    @Override
    public String getApplicationName() {
        return this.applicationName;
    }

    @Override
    public String getStreamName() {
        return this.streamName;
    }

    public void subscribeRecordProcessor(String shardId, KinesisRecordProcessor recordProcessor) {
        this.processors.put(shardId, recordProcessor);
    }

    @Override
    public void unsubscribeRecordProcessor(String shardId) {
        this.processors.remove(shardId);
    }

    @Override
    public boolean consumesShard(String shardId) {
        return this.processors.containsKey(shardId) && this.processors.get(shardId).hasCheckpointer();
    }

    @Override
    public void startConsumer(org.mule.runtime.api.scheduler.Scheduler muleKclScheduler, SourceCallback<List<Result<InputStream, RecordAttributes>>, Void> sourceCallback) {
        ShardRecordProcessorFactory factory = () -> new KinesisRecordProcessor(sourceCallback, this);
        ConfigsBuilder configsBuilder = new ConfigsBuilder(this.streamName, this.applicationName, this.kinesisAsyncClient, this.dynamoClient, this.cloudWatchClient, this.workerId, factory);
        this.kclScheduler = new Scheduler(configsBuilder.checkpointConfig(), this.getCoordinatorConfig(configsBuilder, this.commonParams), this.getLeaseManagementConfig(configsBuilder, this.commonParams), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), this.getRetrievalConfig(configsBuilder, this.commonParams, this.streamInitialPosition, this.kinesisAsyncClient));
        this.scheduledFuture = muleKclScheduler.schedule((Runnable)this.kclScheduler, 0L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        try {
            this.kclScheduler.startGracefulShutdown().get(30L, TimeUnit.SECONDS);
            this.scheduledFuture.get(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.info("Interrupted while waiting for graceful shutdown. Continuing.", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            logger.error("Exception while executing graceful shutdown.", (Throwable)e);
        }
    }

    private CoordinatorConfig getCoordinatorConfig(ConfigsBuilder configsBuilder, CommonListenerParameters commonParams) {
        Object shardPrioritization;
        CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
        if (!commonParams.getShardConfigs().isEmpty()) {
            List<String> shardIDs = commonParams.getShardConfigs().stream().map(ShardConfig::getShardId).collect(Collectors.toList());
            shardPrioritization = ShardPrioritization.PARENTS_FIRST_SHARD_PRIORITIZATION.equals((Object)commonParams.getShardPrioritization()) ? new ShardFilterWithParentPrioritization(shardIDs, commonParams.getMaxDepth()) : new ShardFilter(shardIDs);
        } else {
            shardPrioritization = ShardPrioritization.PARENTS_FIRST_SHARD_PRIORITIZATION.equals((Object)commonParams.getShardPrioritization()) ? new ParentsFirstShardPrioritization(commonParams.getMaxDepth()) : new NoOpShardPrioritization();
        }
        coordinatorConfig.shardPrioritization((software.amazon.kinesis.leases.ShardPrioritization)shardPrioritization);
        coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
        return coordinatorConfig;
    }

    private LeaseManagementConfig getLeaseManagementConfig(ConfigsBuilder configsBuilder, CommonListenerParameters commonParams) {
        LeaseManagementConfig leaseManagementConfig = configsBuilder.leaseManagementConfig();
        leaseManagementConfig.maxLeasesForWorker(commonParams.getMaxLeasesForWorker());
        leaseManagementConfig.failoverTimeMillis(commonParams.getFailoverTimeInMillis());
        return leaseManagementConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareInternalDynamoDBTable(CommonListenerParameters commonParams, StreamInitialPosition streamInitialPosition) {
        Class<ConsumerImpl> clazz = ConsumerImpl.class;
        synchronized (ConsumerImpl.class) {
            List<String> neverLeasedIds;
            List<ShardConfig> finalShardConfigs;
            this.createKinesisMetadataTables(commonParams);
            // ** MonitorExit[var3_3] (shouldn't be in output)
            Map<String, Shard> shardMap = this.fetchKinesisShardMap(commonParams.getStreamName());
            if (!commonParams.getShardConfigs().isEmpty()) {
                finalShardConfigs = commonParams.getShardConfigs();
            } else {
                finalShardConfigs = new ArrayList<ShardConfig>();
                AbsolutePosition absolutePosition = streamInitialPosition.getAbsolutePosition();
                LocalDateTime timestamp = streamInitialPosition.getTimestamp();
                for (String shardId : shardMap.keySet()) {
                    finalShardConfigs.add(new ShardConfig(shardId, new ShardInitialPosition(absolutePosition, timestamp, null, null)));
                }
            }
            List<String> filteredShardIds = finalShardConfigs.stream().filter(shardConfig -> shardMap.containsKey(shardConfig.getShardId()) && this.isSequenceNumberValid((Shard)shardMap.get(shardConfig.getShardId()), (ShardConfig)shardConfig)).map(ShardConfig::getShardId).collect(Collectors.toList());
            try {
                neverLeasedIds = this.getNeverLeasedShards(filteredShardIds);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof RequestLimitExceededException || e.getCause() instanceof software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException) {
                    throw new DynamoDbThroughputExceededException(e.getCause());
                }
                neverLeasedIds = filteredShardIds;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KinesisRuntimeException(e);
            }
            List<String> finalNeverLeasedIds = neverLeasedIds;
            this.dynamodbBatchWrite(commonParams, finalShardConfigs, finalNeverLeasedIds, shardMap);
            return;
        }
    }

    private void dynamodbBatchWrite(CommonListenerParameters commonParams, List<ShardConfig> finalShardConfigs, List<String> finalNeverLeasedIds, Map<String, Shard> shardMap) {
        AtomicInteger counter = new AtomicInteger();
        List batchWriteItemRequests = finalShardConfigs.stream().filter(shardConfig -> finalNeverLeasedIds.contains(shardConfig.getShardId())).collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 25)).values().stream().map(list -> {
            LinkedList itemsToCreate = new LinkedList();
            list.forEach(shardConfig -> {
                HashMap<String, Object> itemFields = new HashMap<String, Object>();
                itemFields.put(LEASE_KEY, AttributeValue.builder().s(shardConfig.getShardId()).build());
                itemFields.put("checkpoint", AttributeValue.builder().s(this.getCheckpoint(commonParams, (ShardConfig)shardConfig)).build());
                itemFields.put("leaseOwner", AttributeValue.builder().s(this.workerId).build());
                itemFields.put("ownerSwitchesSinceCheckpoint", AttributeValue.builder().n("0").build());
                itemFields.put("checkpointSubSequenceNumber", AttributeValue.builder().n("0").build());
                itemFields.put("leaseCounter", AttributeValue.builder().n("0").build());
                Shard shard = (Shard)shardMap.get(shardConfig.getShardId());
                if (shard != null && shard.parentShardId() != null) {
                    itemFields.put("parentShardId", AttributeValue.builder().s(shard.parentShardId()).build());
                }
                WriteRequest writeRequest = (WriteRequest)WriteRequest.builder().putRequest(putRequestBuilder -> putRequestBuilder.item(itemFields)).build();
                itemsToCreate.add(writeRequest);
            });
            if (itemsToCreate.isEmpty()) {
                throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException("The shard IDs you specified don't exist in this stream.");
            }
            return (BatchWriteItemRequest)BatchWriteItemRequest.builder().requestItems(Collections.singletonMap(this.leaseTableName, itemsToCreate)).build();
        }).collect(Collectors.toList());
        try {
            this.waitForTableCreation(this.leaseTableName);
            this.waitForTableCreation(this.coordinatorStateTableName);
            this.waitForTableCreation(this.workerMetricStatsTableName);
            for (BatchWriteItemRequest batchWriteItemRequest : batchWriteItemRequests) {
                this.dynamoClient.batchWriteItem(batchWriteItemRequest).get();
            }
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof ResourceNotFoundException) {
                throw new InvalidStreamException(e.getCause());
            }
            if (e.getCause() instanceof RequestLimitExceededException || e.getCause() instanceof software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException) {
                throw new DynamoDbThroughputExceededException(e.getCause());
            }
            throw this.wrapException(e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KinesisRuntimeException(e);
        }
    }

    private String getCheckpoint(CommonListenerParameters commonParams, ShardConfig shardConfig) {
        String checkpoint = "LATEST";
        if (shardConfig.getShardInitialPosition().getAtSequenceNumber() != null) {
            checkpoint = shardConfig.getShardInitialPosition().getAtSequenceNumber();
        } else if (shardConfig.getShardInitialPosition().getAfterSequenceNumber() != null) {
            checkpoint = this.getSequenceNumberWithRetry(commonParams.getStreamName(), shardConfig.getShardId(), shardConfig.getShardInitialPosition().getAfterSequenceNumber());
        } else if (shardConfig.getShardInitialPosition().getAbsolutePosition() != null) {
            checkpoint = shardConfig.getShardInitialPosition().getAbsolutePosition().equals((Object)AbsolutePosition.LATEST) ? "LATEST" : "TRIM_HORIZON";
        } else if (shardConfig.getShardInitialPosition().getTimestamp() != null) {
            checkpoint = this.getSequenceNumberWithRetry(commonParams.getStreamName(), shardConfig.getShardId(), shardConfig.getShardInitialPosition().getTimestamp());
        }
        return checkpoint;
    }

    private void createKinesisMetadataTables(CommonListenerParameters commonParams) throws KinesisRuntimeException {
        CompletableFuture<Void> leaseTableCreationFuture = CompletableFuture.runAsync(() -> this.createTableIfNotExists(KINESIS_METADATA_TABLE_TYPE.LEASE_TABLE, this.leaseTableName, commonParams));
        CompletableFuture<Void> coordinatorStateTableCreationFuture = CompletableFuture.runAsync(() -> this.createTableIfNotExists(KINESIS_METADATA_TABLE_TYPE.COORDINATOR_STATE_TABLE, this.coordinatorStateTableName, commonParams));
        CompletableFuture<Void> workerMatricsStatsTableCreationFuture = CompletableFuture.runAsync(() -> this.createTableIfNotExists(KINESIS_METADATA_TABLE_TYPE.WORKER_MATRIX_STATS_TABLE, this.workerMetricStatsTableName, commonParams));
        try {
            CompletableFuture.allOf(leaseTableCreationFuture, coordinatorStateTableCreationFuture, workerMatricsStatsTableCreationFuture).thenAccept(v -> logger.info("All Kinesis metadata table requirements are fulfilled successfully."));
        }
        catch (CompletionException e) {
            logger.info("Kinesis metadata tables COULD NOT be created successfully.");
            if (e.getCause() instanceof KinesisRuntimeException) {
                throw new KinesisRuntimeException(e.getCause().getCause());
            }
            throw this.wrapException(e.getCause().getCause());
        }
    }

    private void createTableIfNotExists(KINESIS_METADATA_TABLE_TYPE tableType, String tableName, CommonListenerParameters commonParams) throws KinesisRuntimeException {
        try {
            this.dynamoClient.describeTable((DescribeTableRequest)DescribeTableRequest.builder().tableName(tableName).build()).get();
            logger.debug("Kinesis metadata table [{}] already exists, so skipping table creation.", (Object)tableName);
        }
        catch (ExecutionException e) {
            if (e.getCause().getCause() instanceof ResourceNotFoundException) {
                this.createTable(tableType, tableName, commonParams);
                logger.debug("Kinesis metadata table [{}] created successfully.", (Object)tableName);
            }
            throw this.wrapException(e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KinesisRuntimeException(e);
        }
    }

    private Map<String, Shard> fetchKinesisShardMap(String streamName) {
        Map<String, Shard> shardMap;
        try {
            ListShardsResponse listShardsResponse = this.kinesisSyncClient.listShards((ListShardsRequest)ListShardsRequest.builder().streamName(streamName).build());
            shardMap = listShardsResponse.shards().stream().collect(Collectors.toMap(Shard::shardId, Function.identity()));
            while (listShardsResponse.nextToken() != null) {
                listShardsResponse = this.kinesisSyncClient.listShards((ListShardsRequest)ListShardsRequest.builder().nextToken(listShardsResponse.nextToken()).build());
                shardMap.putAll(listShardsResponse.shards().stream().collect(Collectors.toMap(Shard::shardId, Function.identity())));
            }
        }
        catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) {
            throw new InvalidShardException(e);
        }
        catch (InvalidArgumentException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(e);
        }
        catch (software.amazon.awssdk.services.kinesis.model.LimitExceededException e) {
            throw new LimitExceededException(e);
        }
        return shardMap;
    }

    private RetrievalConfig getRetrievalConfig(ConfigsBuilder configsBuilder, CommonListenerParameters commonParams, StreamInitialPosition streamInitialPosition, KinesisAsyncClient kinesisAsyncClient) {
        InitialPositionInStreamExtended position = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.LATEST);
        if (Objects.nonNull(streamInitialPosition.getTimestamp())) {
            Date date = Date.from(streamInitialPosition.getTimestamp().atZone(ZoneId.systemDefault()).toInstant());
            position = InitialPositionInStreamExtended.newInitialPositionAtTimestamp((Date)date);
        } else if (AbsolutePosition.FIRST.equals((Object)streamInitialPosition.getAbsolutePosition())) {
            position = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.TRIM_HORIZON);
        } else if (AbsolutePosition.LATEST.equals((Object)streamInitialPosition.getAbsolutePosition())) {
            position = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.LATEST);
        }
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig((RetrievalSpecificConfig)new PollingConfig(commonParams.getStreamName(), kinesisAsyncClient).maxRecords(this.maxBatchSize));
        retrievalConfig.streamTracker((StreamTracker)new SingleStreamTracker(commonParams.getStreamName(), position));
        return retrievalConfig;
    }

    private String getSequenceNumberWithRetry(String streamName, String shardId, LocalDateTime timestamp) {
        String shardIterator;
        try {
            shardIterator = this.kinesisSyncClient.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(streamName).shardId(shardId).shardIteratorType(ShardIteratorType.AT_TIMESTAMP).timestamp(timestamp.toInstant(ZoneOffset.UTC)).build()).shardIterator();
        }
        catch (ProvisionedThroughputExceededException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.ProvisionedThroughputExceededException(e);
        }
        catch (ResourceNotFoundException e) {
            throw new InvalidStreamException(e);
        }
        catch (InvalidArgumentException e) {
            if (e.getMessage().contains("The timestampInMillis parameter cannot be greater than the currentTimestampInMillis.")) {
                throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException("The timestamp parameter has to be older than the current time.");
            }
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(e);
        }
        catch (Exception e) {
            throw this.wrapException(e);
        }
        String sequenceNumber = this.getSequenceNumber(shardIterator);
        return Objects.nonNull(sequenceNumber) ? sequenceNumber : AbsolutePosition.LATEST.toString();
    }

    private String getSequenceNumberWithRetry(String streamName, String shardId, String afterSequenceNumber) {
        String shardIterator;
        try {
            shardIterator = this.kinesisSyncClient.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(streamName).shardId(shardId).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(afterSequenceNumber).build()).shardIterator();
        }
        catch (ProvisionedThroughputExceededException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.ProvisionedThroughputExceededException(e);
        }
        catch (InvalidArgumentException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(e);
        }
        catch (ResourceNotFoundException e) {
            throw new InvalidStreamException(e);
        }
        return this.getSequenceNumber(shardIterator);
    }

    private String getSequenceNumber(String shardIterator) {
        GetRecordsResponse getRecordsResponse;
        try {
            getRecordsResponse = this.kinesisSyncClient.getRecords((GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).build());
            while (getRecordsResponse.records().isEmpty() && getRecordsResponse.millisBehindLatest() > 0L) {
                getRecordsResponse = this.kinesisSyncClient.getRecords((GetRecordsRequest)GetRecordsRequest.builder().shardIterator(getRecordsResponse.nextShardIterator()).build());
            }
        }
        catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) {
            throw new InvalidStreamException(e);
        }
        catch (InvalidArgumentException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(e);
        }
        catch (ProvisionedThroughputExceededException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.ProvisionedThroughputExceededException(e);
        }
        catch (KmsAccessDeniedException | KmsDisabledException | KmsInvalidStateException | KmsNotFoundException | KmsOptInRequiredException | KmsThrottlingException e) {
            throw new KeyManagementServiceException(e);
        }
        return !getRecordsResponse.records().isEmpty() ? ((Record)getRecordsResponse.records().get(0)).sequenceNumber() : null;
    }

    protected List<String> getNeverLeasedShards(List<String> requestedShards) throws ExecutionException, InterruptedException {
        ArrayList attributeValues = new ArrayList();
        requestedShards.forEach(shardId -> {
            AttributeValue attributeValue = (AttributeValue)AttributeValue.builder().s(shardId).build();
            attributeValues.add(attributeValue);
        });
        Condition condition = (Condition)Condition.builder().attributeValueList(attributeValues).comparisonOperator(ComparisonOperator.IN).build();
        HashMap<String, Condition> conditionMap = new HashMap<String, Condition>();
        conditionMap.put(LEASE_KEY, condition);
        ScanRequest scanRequest = (ScanRequest)ScanRequest.builder().scanFilter(conditionMap).tableName(this.leaseTableName).build();
        this.waitForTableCreation(this.leaseTableName);
        ScanResponse scanResponse = (ScanResponse)this.dynamoClient.scan(scanRequest).get();
        if (!scanResponse.hasItems() || scanResponse.items().isEmpty()) {
            return requestedShards;
        }
        List databaseShardIds = scanResponse.items().stream().map(map -> ((AttributeValue)map.get(LEASE_KEY)).s()).collect(Collectors.toList());
        return requestedShards.stream().filter(shardId -> !databaseShardIds.contains(shardId)).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForTableCreation(String tableName) throws InterruptedException {
        long sleep = 500L;
        int numberOfRetries = 5;
        for (int i = 0; i < 5; ++i) {
            try {
                DescribeTableResponse describeTableResponse = (DescribeTableResponse)this.dynamoClient.describeTable((DescribeTableRequest)DescribeTableRequest.builder().tableName(tableName).build()).get();
                if (describeTableResponse.table().tableStatus().equals((Object)TableStatus.ACTIVE)) break;
                if (i == 4) {
                    throw new IllegalStateException(String.format("Table '%s' wasn't created (table status - '%s') after %s retries.", tableName, describeTableResponse.table().tableStatus(), 5));
                }
                logger.debug("Table '{}' isn't yet created, table status '{}', retrying in {} milliseconds.", new Object[]{tableName, describeTableResponse.table().tableStatus(), sleep});
                continue;
            }
            catch (ExecutionException e) {
                if (i == 4) {
                    throw new IllegalStateException(String.format("Couldn't retrieve status of table '%s' after %s retries", tableName, 5), e.getCause());
                }
                logger.debug("Couldn't retrieve status of table '{}', retrying in {} milliseconds. Cause: {}", new Object[]{tableName, sleep, e.getCause().getMessage()});
                continue;
            }
            finally {
                Thread.sleep(sleep);
                sleep *= 2L;
            }
        }
    }

    private boolean isSequenceNumberValid(Shard shard, ShardConfig shardConfig) {
        ShardInitialPosition initPosition = shardConfig.getShardInitialPosition();
        if (Objects.nonNull(initPosition.getAfterSequenceNumber()) || Objects.nonNull(initPosition.getAtSequenceNumber())) {
            BigInteger sequenceNumber = Objects.nonNull(initPosition.getAfterSequenceNumber()) ? this.parseSequenceNumber(initPosition.getAfterSequenceNumber()).add(BigInteger.ONE) : this.parseSequenceNumber(initPosition.getAtSequenceNumber());
            BigInteger startSeqNum = new BigInteger(shard.sequenceNumberRange().startingSequenceNumber());
            if (Objects.isNull(shard.sequenceNumberRange().endingSequenceNumber())) {
                return startSeqNum.compareTo(sequenceNumber) <= 0;
            }
            BigInteger endSeqNum = new BigInteger(shard.sequenceNumberRange().endingSequenceNumber());
            return startSeqNum.compareTo(sequenceNumber) <= 0 && endSeqNum.compareTo(sequenceNumber) >= 0;
        }
        return true;
    }

    private MuleRuntimeException wrapException(Throwable e) {
        if (e.getMessage().contains("The security token included in the request is invalid.") || e.getMessage().contains("The request signature we calculated does not match the signature you provided.")) {
            return new UnauthorizedException(e);
        }
        if (e.getMessage().contains("The timestampInMillis parameter cannot be greater than the currentTimestampInMillis.")) {
            return new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException("The timestamp parameter has to be older than the current time.");
        }
        return new KinesisRuntimeException(e);
    }

    private void createTable(KINESIS_METADATA_TABLE_TYPE tableType, String tableName, CommonListenerParameters commonParams) {
        List<Object> keySchemaElements = null;
        List<Object> attributeDefinitions = null;
        switch (tableType) {
            case LEASE_TABLE: {
                DynamoDBLeaseSerializer serializer = new DynamoDBLeaseSerializer();
                keySchemaElements = serializer.getKeySchema();
                attributeDefinitions = serializer.getAttributeDefinitions();
                break;
            }
            case COORDINATOR_STATE_TABLE: {
                keySchemaElements = Collections.singletonList(KeySchemaElement.builder().attributeName("key").keyType(KeyType.HASH).build());
                attributeDefinitions = Collections.singletonList(AttributeDefinition.builder().attributeName("key").attributeType(ScalarAttributeType.S).build());
                break;
            }
            case WORKER_MATRIX_STATS_TABLE: {
                keySchemaElements = WorkerMetricStats.getKeySchema();
                attributeDefinitions = WorkerMetricStats.getAttributeDefinitions();
            }
        }
        ProvisionedThroughput throughput = (ProvisionedThroughput)ProvisionedThroughput.builder().readCapacityUnits(Long.valueOf(commonParams.getReadCapacityUnits())).writeCapacityUnits(Long.valueOf(commonParams.getWriteCapacityUnits())).build();
        CreateTableRequest request = (CreateTableRequest)CreateTableRequest.builder().tableName(tableName).keySchema(keySchemaElements).attributeDefinitions(attributeDefinitions).provisionedThroughput(throughput).build();
        try {
            this.dynamoClient.createTable(request).get();
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof LimitExceededException) {
                throw new DynamoDbThroughputExceededException(e.getCause());
            }
            if (e.getCause() instanceof ResourceInUseException) {
                throw new DynamoDbResourceInUseException(e.getCause());
            }
            throw this.wrapException(e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KinesisRuntimeException(e);
        }
    }

    private BigInteger parseSequenceNumber(String seqNumber) {
        BigInteger parsedNumber;
        try {
            parsedNumber = new BigInteger(seqNumber);
        }
        catch (NumberFormatException e) {
            throw new com.mulesoft.connector.amazon.kinesis.internal.error.exception.InvalidArgumentException(String.format("SequenceNumber '%s' is not a valid number.", seqNumber));
        }
        return parsedNumber;
    }

    private static enum KINESIS_METADATA_TABLE_TYPE {
        LEASE_TABLE,
        COORDINATOR_STATE_TABLE,
        WORKER_MATRIX_STATS_TABLE;

    }
}

