/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.inbound.kinesis.ShardCheckpointer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends MessageProducerSupport
implements DisposableBean {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final AmazonKinesis amazonKinesis;
    private final String[] streams;
    private final Set<KinesisShardOffset> shardOffsets = new HashSet<KinesisShardOffset>();
    private final Map<KinesisShardOffset, ShardConsumer> shardConsumers = new ConcurrentHashMap<KinesisShardOffset, ShardConsumer>();
    private final Set<String> inResharding = new ConcurrentSkipListSet<String>();
    private final List<ConsumerInvoker> consumerInvokers = new ArrayList<ConsumerInvoker>();
    private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager();
    private final ExecutorService shardLocksExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new CustomizableThreadFactory((this.getComponentName() == null ? "" : this.getComponentName()) + "-kinesis-shard-locks-"));
    private String consumerGroup = "SpringIntegration";
    private ConcurrentMetadataStore checkpointStore = new SimpleMetadataStore();
    private Executor dispatcherExecutor;
    private boolean dispatcherExecutorExplicitlySet;
    private Executor consumerExecutor;
    private boolean consumerExecutorExplicitlySet;
    private int maxConcurrency;
    private int concurrency;
    private KinesisShardOffset streamInitialSequence = KinesisShardOffset.latest();
    private Converter<byte[], Object> converter = new DeserializingConverter();
    private ListenerMode listenerMode = ListenerMode.record;
    private CheckpointMode checkpointMode = CheckpointMode.batch;
    private long checkpointsInterval = 5000L;
    private int recordsLimit = 10000;
    private int idleBetweenPolls = 1000;
    private int consumerBackoff = 1000;
    private int startTimeout = 60000;
    private int describeStreamBackoff = 1000;
    private int describeStreamRetries = 50;
    private boolean resetCheckpoints;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private LockRegistry lockRegistry;
    private boolean bindSourceRecord;
    private volatile boolean active;
    private volatile int consumerInvokerMaxCapacity;
    private volatile Future<?> shardConsumerManagerFuture;

    public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String ... streams) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null.");
        Assert.notEmpty((Object[])streams, (String)"'streams' must not be null.");
        this.amazonKinesis = amazonKinesis;
        this.streams = Arrays.copyOf(streams, streams.length);
    }

    public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset ... shardOffsets) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null.");
        Assert.notEmpty((Object[])shardOffsets, (String)"'shardOffsets' must not be null.");
        Assert.noNullElements((Object[])shardOffsets, (String)"'shardOffsets' must not contain null elements.");
        for (KinesisShardOffset shardOffset : shardOffsets) {
            Assert.isTrue((StringUtils.hasText((String)shardOffset.getStream()) && StringUtils.hasText((String)shardOffset.getShard()) ? 1 : 0) != 0, (String)"The 'shardOffsets' must be provided with particular 'stream' and 'shard' values.");
            this.shardOffsets.add(new KinesisShardOffset(shardOffset));
        }
        this.amazonKinesis = amazonKinesis;
        this.streams = null;
    }

    public void setConsumerGroup(String consumerGroup) {
        Assert.hasText((String)consumerGroup, (String)"'consumerGroup' must not be empty");
        this.consumerGroup = consumerGroup;
    }

    public void setCheckpointStore(ConcurrentMetadataStore checkpointStore) {
        Assert.notNull((Object)checkpointStore, (String)"'checkpointStore' must not be null");
        this.checkpointStore = checkpointStore;
    }

    public void setConsumerExecutor(Executor executor) {
        Assert.notNull((Object)executor, (String)"'executor' must not be null");
        this.consumerExecutor = executor;
        this.consumerExecutorExplicitlySet = true;
    }

    public void setDispatcherExecutor(Executor dispatcherExecutor) {
        this.dispatcherExecutor = dispatcherExecutor;
        this.dispatcherExecutorExplicitlySet = true;
    }

    public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence) {
        Assert.notNull((Object)streamInitialSequence, (String)"'streamInitialSequence' must not be null");
        this.streamInitialSequence = streamInitialSequence;
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull((Object)((Object)listenerMode), (String)"'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull((Object)((Object)checkpointMode), (String)"'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setCheckpointsInterval(long checkpointsInterval) {
        this.checkpointsInterval = checkpointsInterval;
    }

    public void setRecordsLimit(int recordsLimit) {
        Assert.isTrue((recordsLimit > 0 ? 1 : 0) != 0, (String)"'recordsLimit' must be more than 0");
        this.recordsLimit = Math.min(10000, recordsLimit);
    }

    public void setConsumerBackoff(int consumerBackoff) {
        this.consumerBackoff = Math.max(1000, consumerBackoff);
    }

    public void setDescribeStreamBackoff(int describeStreamBackoff) {
        this.describeStreamBackoff = Math.max(1000, describeStreamBackoff);
    }

    public void setDescribeStreamRetries(int describeStreamRetries) {
        Assert.isTrue((describeStreamRetries > 0 ? 1 : 0) != 0, (String)"'describeStreamRetries' must be more than 0");
        this.describeStreamRetries = describeStreamRetries;
    }

    public void setStartTimeout(int startTimeout) {
        Assert.isTrue((startTimeout > 0 ? 1 : 0) != 0, (String)"'startTimeout' must be more than 0");
        this.startTimeout = startTimeout;
    }

    public void setConcurrency(int concurrency) {
        this.maxConcurrency = concurrency;
    }

    public void setIdleBetweenPolls(int idleBetweenPolls) {
        this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeadersMapper) {
        this.embeddedHeadersMapper = embeddedHeadersMapper;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    protected void onInit() {
        super.onInit();
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newCachedThreadPool((ThreadFactory)new CustomizableThreadFactory((this.getComponentName() == null ? "" : this.getComponentName()) + "-kinesis-consumer-"));
        }
        if (this.dispatcherExecutor == null) {
            this.dispatcherExecutor = Executors.newCachedThreadPool((ThreadFactory)new CustomizableThreadFactory((this.getComponentName() == null ? "" : this.getComponentName()) + "-kinesis-dispatcher-"));
        }
        if (this.streams == null) {
            if (this.lockRegistry != null) {
                this.logger.warn((Object)"The LockRegistry is ignored when explicit shards configuration is used.");
            }
            this.lockRegistry = null;
        }
    }

    public void destroy() {
        if (!this.consumerExecutorExplicitlySet) {
            ((ExecutorService)this.consumerExecutor).shutdown();
        }
        if (!this.dispatcherExecutorExplicitlySet) {
            ((ExecutorService)this.dispatcherExecutor).shutdown();
        }
    }

    @ManagedOperation
    public void stopConsumer(String stream, String shard) {
        ShardConsumer shardConsumer = this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard));
        if (shardConsumer != null) {
            shardConsumer.stop();
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("There is no ShardConsumer for shard [" + shard + "] in stream [" + shard + "] to stop."));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ManagedOperation
    public void startConsumer(String stream, String shard) {
        KinesisShardOffset shardOffsetForSearch = KinesisShardOffset.latest(stream, shard);
        ShardConsumer shardConsumer = this.shardConsumers.get(shardOffsetForSearch);
        if (shardConsumer != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("The [" + shardConsumer + "] has been started before."));
            }
        } else {
            Set<KinesisShardOffset> set = this.shardOffsets;
            synchronized (set) {
                for (KinesisShardOffset shardOffset : this.shardOffsets) {
                    if (!shardOffsetForSearch.equals(shardOffset)) continue;
                    this.shardConsumerManager.addShardToConsume(shardOffset);
                    break;
                }
            }
        }
    }

    @ManagedOperation
    public void resetCheckpointForShardToLatest(String stream, String shard) {
        this.restartShardConsumerForOffset(KinesisShardOffset.latest(stream, shard));
    }

    @ManagedOperation
    public void resetCheckpointForShardToTrimHorizon(String stream, String shard) {
        this.restartShardConsumerForOffset(KinesisShardOffset.trimHorizon(stream, shard));
    }

    @ManagedOperation
    public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) {
        this.restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber));
    }

    @ManagedOperation
    public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) {
        this.restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(stream, shard, new Date(timestamp)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartShardConsumerForOffset(KinesisShardOffset shardOffset) {
        Assert.isTrue((boolean)this.shardOffsets.contains(shardOffset), (String)("The [" + (Object)((Object)this) + "] doesn't operate shard [" + shardOffset.getShard() + "] for stream [" + shardOffset.getStream() + "]"));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Resetting consumer for [" + shardOffset + "]..."));
        }
        shardOffset.reset();
        Set<KinesisShardOffset> set = this.shardOffsets;
        synchronized (set) {
            this.shardOffsets.remove(shardOffset);
            this.shardOffsets.add(shardOffset);
        }
        if (this.active) {
            ShardConsumer oldShardConsumer = this.shardConsumers.remove(shardOffset);
            if (oldShardConsumer != null) {
                oldShardConsumer.close();
            }
            shardOffset.setReset(true);
            this.shardConsumerManager.addShardToConsume(shardOffset);
        }
    }

    @ManagedOperation
    public void resetCheckpoints() {
        this.resetCheckpoints = true;
        if (this.active) {
            this.stopConsumers();
            this.populateConsumers();
        }
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals((Object)this.listenerMode) && CheckpointMode.record.equals((Object)this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn((Object)"The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        if (this.streams != null) {
            this.populateShardsForStreams();
        }
        this.populateConsumers();
        this.active = true;
        this.concurrency = Math.min(this.maxConcurrency, this.shardOffsets.size());
        this.dispatcherExecutor.execute((Runnable)((Object)new ConsumerDispatcher()));
        this.shardConsumerManagerFuture = this.shardLocksExecutor.submit((Runnable)((Object)this.shardConsumerManager));
    }

    private Collection<ShardConsumer> shardConsumerSubset(int i) {
        ArrayList<ShardConsumer> shardConsumers = new ArrayList<ShardConsumer>(this.shardConsumers.values());
        if (this.concurrency == 1) {
            return shardConsumers;
        }
        int numConsumers = shardConsumers.size();
        if (numConsumers == this.concurrency) {
            return Collections.singleton(shardConsumers.get(i));
        }
        int perInvoker = numConsumers / this.concurrency;
        List<ShardConsumer> subset = i == this.concurrency - 1 ? shardConsumers.subList(i * perInvoker, numConsumers) : shardConsumers.subList(i * perInvoker, (i + 1) * perInvoker);
        return subset;
    }

    private void populateShardsForStreams() {
        this.shardOffsets.clear();
        CountDownLatch shardsGatherLatch = new CountDownLatch(this.streams.length);
        for (String stream : this.streams) {
            this.populateShardsForStream(stream, shardsGatherLatch);
        }
        try {
            if (!shardsGatherLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("The [ " + (Object)((Object)this) + "] could not start during timeout: " + this.startTimeout);
            }
        }
        catch (InterruptedException e) {
            throw new IllegalStateException("The [ " + (Object)((Object)this) + "] has been interrupted from start.");
        }
    }

    private void populateShardsForStream(String stream, CountDownLatch shardsGatherLatch) {
        this.dispatcherExecutor.execute(() -> {
            try {
                int describeStreamRetries = 0;
                ArrayList<Shard> shardsToConsume = new ArrayList<Shard>();
                String exclusiveStartShardId = null;
                while (true) {
                    DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(stream).withExclusiveStartShardId(exclusiveStartShardId);
                    DescribeStreamResult describeStreamResult = null;
                    try {
                        describeStreamResult = this.amazonKinesis.describeStream(describeStreamRequest);
                    }
                    catch (Exception e) {
                        this.logger.info((Object)("Got an exception when describing stream [" + stream + "]. Backing off for [" + this.describeStreamBackoff + "] millis."), (Throwable)e);
                    }
                    if (describeStreamResult == null || !StreamStatus.ACTIVE.toString().equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
                        if (describeStreamRetries++ > this.describeStreamRetries) {
                            ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException("The stream [" + stream + "] isn't ACTIVE or doesn't exist.");
                            resourceNotFoundException.setServiceName("Kinesis");
                            throw resourceNotFoundException;
                        }
                        try {
                            Thread.sleep(this.describeStreamBackoff);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new IllegalStateException("The [describeStream] thread for the stream [" + stream + "] has been interrupted.", e);
                        }
                    }
                    List shards = describeStreamResult.getStreamDescription().getShards();
                    try {
                        for (Shard shard : shards) {
                            String key = this.buildCheckpointKeyForShard(stream, shard.getShardId());
                            String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber();
                            if (endingSequenceNumber != null) {
                                boolean skipClosedShard;
                                String checkpoint = this.checkpointStore.get(key);
                                boolean bl = skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber).compareTo(new BigInteger(checkpoint)) <= 0;
                                if (this.logger.isTraceEnabled()) {
                                    this.logger.trace((Object)("The shard [" + shard + "] in stream [" + stream + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber + "].\nThe last processed checkpoint is [" + checkpoint + "]." + (skipClosedShard ? "\nThe shard will be skipped." : "")));
                                }
                                if (skipClosedShard) continue;
                            }
                            shardsToConsume.add(shard);
                        }
                    }
                    catch (Exception e) {
                        this.logger.info((Object)("Got an exception when processing shards in stream [" + stream + "].\nRetrying..."), (Throwable)e);
                        continue;
                    }
                    if (!describeStreamResult.getStreamDescription().getHasMoreShards().booleanValue()) break;
                    exclusiveStartShardId = ((Shard)shards.get(shards.size() - 1)).getShardId();
                    describeStreamRetries = 0;
                }
                for (Shard shard : shardsToConsume) {
                    boolean addedOffset;
                    KinesisShardOffset shardOffset = new KinesisShardOffset(this.streamInitialSequence);
                    shardOffset.setShard(shard.getShardId());
                    shardOffset.setStream(stream);
                    Set<KinesisShardOffset> set = this.shardOffsets;
                    synchronized (set) {
                        addedOffset = this.shardOffsets.add(shardOffset);
                    }
                    if (!addedOffset || shardsGatherLatch != null || !this.active) continue;
                    this.shardConsumerManager.addShardToConsume(shardOffset);
                }
            }
            finally {
                if (shardsGatherLatch != null) {
                    shardsGatherLatch.countDown();
                }
                this.inResharding.remove(stream);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void populateConsumers() {
        Set<KinesisShardOffset> set = this.shardOffsets;
        synchronized (set) {
            for (KinesisShardOffset shardOffset : this.shardOffsets) {
                this.shardConsumerManager.addShardToConsume(shardOffset);
            }
        }
        this.resetCheckpoints = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void populateConsumer(KinesisShardOffset shardOffset) {
        shardOffset.setReset(this.resetCheckpoints);
        ShardConsumer shardConsumer = new ShardConsumer(shardOffset);
        if (this.active) {
            List<ConsumerInvoker> list = this.consumerInvokers;
            synchronized (list) {
                if (this.consumerInvokers.size() < this.maxConcurrency) {
                    ConsumerInvoker consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer));
                    this.consumerInvokers.add(consumerInvoker);
                    this.consumerExecutor.execute((Runnable)((Object)consumerInvoker));
                } else {
                    for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
                        if (consumerInvoker.consumers.size() >= this.consumerInvokerMaxCapacity) continue;
                        consumerInvoker.addConsumer(shardConsumer);
                        return;
                    }
                    if (this.concurrency != 0) {
                        ConsumerInvoker firstConsumerInvoker = this.consumerInvokers.get(0);
                        firstConsumerInvoker.addConsumer(shardConsumer);
                        this.consumerInvokerMaxCapacity = firstConsumerInvoker.consumers.size();
                    }
                }
            }
        }
        this.shardConsumers.put(shardOffset, shardConsumer);
    }

    private String buildCheckpointKeyForShard(String stream, String shardId) {
        return this.consumerGroup + ":" + stream + ":" + shardId;
    }

    protected void doStop() {
        for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
            consumerInvoker.notifyBarrier();
        }
        super.doStop();
        this.stopConsumers();
        this.shardConsumerManagerFuture.cancel(true);
        this.active = false;
    }

    private void stopConsumers() {
        for (ShardConsumer shardConsumer : this.shardConsumers.values()) {
            shardConsumer.stop();
        }
        this.shardConsumers.clear();
    }

    private void setAttributesIfNecessary(Object record, Message<?> message) {
        if (this.getErrorChannel() != null) {
            AttributeAccessor attributes = ErrorMessageUtils.getAttributeAccessor(message, null);
            attributesHolder.set(attributes);
            attributes.setAttribute("aws_rawRecord", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public String toString() {
        return "KinesisMessageDrivenChannelAdapter{shardOffsets=" + this.shardOffsets + ", consumerGroup='" + this.consumerGroup + '\'' + '}';
    }

    private final class ShardConsumerManager
    implements SchedulingAwareRunnable {
        private final Map<String, KinesisShardOffset> shardOffsetsToConsumer = new ConcurrentHashMap<String, KinesisShardOffset>();
        private final Map<String, Lock> locks = new HashMap<String, Lock>();
        private final Queue<String> forUnlocking = new ConcurrentLinkedQueue<String>();

        ShardConsumerManager() {
        }

        void addShardToConsume(KinesisShardOffset kinesisShardOffset) {
            String lockKey = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard());
            this.shardOffsetsToConsumer.put(lockKey, kinesisShardOffset);
        }

        void unlock(String lockKey) {
            this.forUnlocking.add(lockKey);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void run() {
            Lock lock;
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    String lockKey;
                    this.shardOffsetsToConsumer.entrySet().removeIf(entry -> {
                        boolean remove = true;
                        if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                            String key = (String)entry.getKey();
                            Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain((Object)key);
                            try {
                                if (lock.tryLock()) {
                                    this.locks.put(key, lock);
                                } else {
                                    remove = false;
                                }
                            }
                            catch (Exception e) {
                                KinesisMessageDrivenChannelAdapter.this.logger.error((Object)("Error during locking: " + lock), (Throwable)e);
                            }
                        }
                        if (remove) {
                            KinesisMessageDrivenChannelAdapter.this.populateConsumer((KinesisShardOffset)entry.getValue());
                        }
                        return remove;
                    });
                    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && (lockKey = this.forUnlocking.poll()) != null) {
                        lock = this.locks.remove(lockKey);
                        if (lock == null) continue;
                        try {
                            lock.unlock();
                        }
                        catch (Exception e) {
                            KinesisMessageDrivenChannelAdapter.this.logger.error((Object)("Error during unlocking: " + lock), (Throwable)e);
                        }
                    }
                    try {
                        Thread.sleep(250L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted", e);
                        return;
                    }
                }
            }
            finally {
                Iterator<Lock> iterator = this.locks.values().iterator();
                while (iterator.hasNext()) {
                    lock = iterator.next();
                    try {
                        lock.unlock();
                    }
                    catch (Exception e) {
                        KinesisMessageDrivenChannelAdapter.this.logger.error((Object)("Error during unlocking: " + lock), (Throwable)e);
                    }
                    finally {
                        iterator.remove();
                    }
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private final class ConsumerInvoker
    implements SchedulingAwareRunnable {
        private final Queue<ShardConsumer> consumers = new ConcurrentLinkedQueue<ShardConsumer>();
        private final Semaphore processBarrier = new Semaphore(0);
        private final Runnable notifier = this::notifyBarrier;

        ConsumerInvoker(Collection<ShardConsumer> shardConsumers) {
            for (ShardConsumer shardConsumer : shardConsumers) {
                this.addConsumer(shardConsumer);
            }
        }

        void addConsumer(ShardConsumer shardConsumer) {
            shardConsumer.setNotifier(this.notifier);
            this.consumers.add(shardConsumer);
        }

        void notifyBarrier() {
            this.processBarrier.release();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            List list;
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                try {
                    this.processBarrier.acquire();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("ConsumerInvoker thread [" + this + "] has been interrupted", e);
                }
                Iterator iterator = this.consumers.iterator();
                while (iterator.hasNext()) {
                    ShardConsumer shardConsumer = (ShardConsumer)iterator.next();
                    if (ConsumerState.STOP == shardConsumer.state) {
                        iterator.remove();
                        continue;
                    }
                    if (shardConsumer.task == null) continue;
                    try {
                        shardConsumer.task.run();
                    }
                    catch (Exception e) {
                        KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("Got an exception " + e + " during [" + shardConsumer + "] task invocation.\nProcess will be retried on the next iteration."));
                    }
                }
                list = KinesisMessageDrivenChannelAdapter.this.consumerInvokers;
                synchronized (list) {
                    if (this.consumers.isEmpty()) {
                        KinesisMessageDrivenChannelAdapter.this.consumerInvokers.remove(this);
                        break;
                    }
                }
            }
            list = KinesisMessageDrivenChannelAdapter.this.consumerInvokers;
            synchronized (list) {
                KinesisMessageDrivenChannelAdapter.this.consumerInvokers.remove(this);
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    private static enum ConsumerState {
        NEW,
        EXPIRED,
        CONSUME,
        SLEEP,
        STOP;

    }

    private final class ShardConsumer {
        private final KinesisShardOffset shardOffset;
        private final ShardCheckpointer checkpointer;
        private long nextCheckpointTimeInMillis;
        private final Runnable processTask = this.processTask();
        private final String key;
        private Runnable notifier;
        private volatile ConsumerState state = ConsumerState.NEW;
        private volatile Runnable task;
        private volatile String shardIterator;
        private volatile long sleepUntil;

        ShardConsumer(KinesisShardOffset shardOffset) {
            this.shardOffset = new KinesisShardOffset(shardOffset);
            this.key = KinesisMessageDrivenChannelAdapter.this.buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard());
            this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key);
        }

        void setNotifier(Runnable notifier) {
            this.notifier = notifier;
        }

        void stop() {
            this.state = ConsumerState.STOP;
            if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
                KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(this.key);
            }
            if (this.notifier != null) {
                this.notifier.run();
            }
        }

        void close() {
            this.stop();
            this.checkpointer.close();
        }

        void execute() {
            if (this.task == null) {
                switch (this.state) {
                    case NEW: 
                    case EXPIRED: {
                        this.task = () -> {
                            try {
                                if (this.shardOffset.isReset()) {
                                    this.checkpointer.remove();
                                } else {
                                    String checkpoint = this.checkpointer.getCheckpoint();
                                    if (checkpoint != null) {
                                        this.shardOffset.setSequenceNumber(checkpoint);
                                        this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                                    }
                                }
                                if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled() && this.state == ConsumerState.NEW) {
                                    KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("The [" + this + "] has been started."));
                                }
                                GetShardIteratorRequest shardIteratorRequest = this.shardOffset.toShardIteratorRequest();
                                this.shardIterator = KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getShardIterator(shardIteratorRequest).getShardIterator();
                                if (ConsumerState.STOP != this.state) {
                                    this.state = ConsumerState.CONSUME;
                                }
                            }
                            finally {
                                this.task = null;
                            }
                        };
                        break;
                    }
                    case CONSUME: {
                        this.task = this.processTask;
                        break;
                    }
                    case SLEEP: {
                        if (System.currentTimeMillis() >= this.sleepUntil) {
                            this.state = ConsumerState.CONSUME;
                        }
                        this.task = null;
                        break;
                    }
                    case STOP: {
                        if (this.shardIterator == null) {
                            if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                                KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("Stopping the [" + this + "] on the checkpoint [" + this.checkpointer.getCheckpoint() + "] because the shard has been CLOSED and exhausted."));
                            }
                        } else if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                            KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("Stopping the [" + this + "]."));
                        }
                        this.task = null;
                    }
                }
                if (this.task != null) {
                    if (this.notifier != null) {
                        this.notifier.run();
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.concurrency == 0) {
                        KinesisMessageDrivenChannelAdapter.this.consumerExecutor.execute(this.task);
                    }
                }
            }
        }

        private Runnable processTask() {
            return () -> {
                GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
                getRecordsRequest.setShardIterator(this.shardIterator);
                getRecordsRequest.setLimit(Integer.valueOf(KinesisMessageDrivenChannelAdapter.this.recordsLimit));
                GetRecordsResult result = null;
                try {
                    List records;
                    result = this.getRecords(getRecordsRequest);
                    if (result != null && !(records = result.getRecords()).isEmpty()) {
                        this.processRecords(records);
                    }
                }
                finally {
                    attributesHolder.remove();
                    if (result != null) {
                        this.shardIterator = result.getNextShardIterator();
                        if (this.shardIterator == null) {
                            this.stop();
                        }
                        if (ConsumerState.STOP != this.state && result.getRecords().isEmpty()) {
                            if (KinesisMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                                KinesisMessageDrivenChannelAdapter.this.logger.debug((Object)("No records for [" + this + "] on sequenceNumber [" + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds."));
                            }
                            this.prepareSleepState();
                        }
                    }
                    this.task = null;
                }
            };
        }

        private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
            try {
                return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest);
            }
            catch (ExpiredIteratorException e) {
                if (KinesisMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                    KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("Shard iterator for [" + this + "] expired.\nA new one will be started from the check pointed sequence number."));
                }
                this.state = ConsumerState.EXPIRED;
            }
            catch (ProvisionedThroughputExceededException e) {
                if (KinesisMessageDrivenChannelAdapter.this.logger.isWarnEnabled()) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn((Object)("GetRecords request throttled for [" + this + "] with the reason: " + e.getErrorMessage()));
                }
                this.prepareSleepState();
            }
            return null;
        }

        private void prepareSleepState() {
            this.sleepUntil = System.currentTimeMillis() + (long)KinesisMessageDrivenChannelAdapter.this.consumerBackoff;
            this.state = ConsumerState.SLEEP;
        }

        private void processRecords(List<Record> records) {
            if (KinesisMessageDrivenChannelAdapter.this.logger.isTraceEnabled()) {
                KinesisMessageDrivenChannelAdapter.this.logger.trace((Object)("Processing records: " + records + " for [" + this + "]"));
            }
            this.checkpointer.setHighestSequence(records.get(records.size() - 1).getSequenceNumber());
            switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) {
                case record: {
                    for (Record record : records) {
                        this.performSend(this.prepareMessageForRecord(record), record);
                        if (!CheckpointMode.record.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) continue;
                        this.checkpointer.checkpoint(record.getSequenceNumber());
                    }
                    break;
                }
                case batch: {
                    ArrayList sequenceNumbers;
                    ArrayList partitionKeys;
                    List<Object> payload = records;
                    if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                        payload = records.stream().map(this::prepareMessageForRecord).collect(Collectors.toList());
                    }
                    if (KinesisMessageDrivenChannelAdapter.this.converter != null) {
                        partitionKeys = new ArrayList();
                        sequenceNumbers = new ArrayList();
                        payload = records.stream().map(r -> {
                            partitionKeys.add(r.getPartitionKey());
                            sequenceNumbers.add(r.getSequenceNumber());
                            return KinesisMessageDrivenChannelAdapter.this.converter.convert((Object)r.getData().array());
                        }).collect(Collectors.toList());
                    } else {
                        partitionKeys = null;
                        sequenceNumbers = null;
                    }
                    AbstractIntegrationMessageBuilder messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", partitionKeys).setHeader("aws_receivedSequenceNumber", sequenceNumbers);
                    this.performSend(messageBuilder, records);
                }
            }
            if (CheckpointMode.batch.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpointer.checkpoint();
            } else if (CheckpointMode.periodic.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode) && System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
                this.checkpointer.checkpoint();
                this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval;
            }
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object payload = record.getData().array();
            Message messageToUse = null;
            if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    messageToUse = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage(payload);
                    payload = messageToUse.getPayload();
                }
                catch (Exception e) {
                    KinesisMessageDrivenChannelAdapter.this.logger.warn((Object)"Could not parse embedded headers. Remain payload untouched.", (Throwable)e);
                }
            }
            if (payload instanceof byte[] && KinesisMessageDrivenChannelAdapter.this.converter != null) {
                payload = KinesisMessageDrivenChannelAdapter.this.converter.convert(payload);
            }
            AbstractIntegrationMessageBuilder messageBuilder = KinesisMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", (Object)record.getPartitionKey()).setHeader("aws_receivedSequenceNumber", (Object)record.getSequenceNumber());
            if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) {
                messageBuilder.setHeader("sourceData", (Object)record);
            }
            if (messageToUse != null) {
                messageBuilder.copyHeadersIfAbsent((Map)messageToUse.getHeaders());
            }
            return messageBuilder;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
            messageBuilder.setHeader("aws_receivedStream", (Object)this.shardOffset.getStream()).setHeader("aws_shard", (Object)this.shardOffset.getShard());
            if (CheckpointMode.manual.equals((Object)KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
                messageBuilder.setHeader("aws_checkpointer", (Object)this.checkpointer);
            }
            Message messageToSend = messageBuilder.build();
            KinesisMessageDrivenChannelAdapter.this.setAttributesIfNecessary(rawRecord, messageToSend);
            try {
                KinesisMessageDrivenChannelAdapter.this.sendMessage(messageToSend);
            }
            catch (Exception e) {
                KinesisMessageDrivenChannelAdapter.this.logger.info((Object)("Got an exception during sending a '" + messageToSend + "'\nfor the '" + rawRecord + "'.\nConsider to use 'errorChannel' flow for the compensation logic."), (Throwable)e);
            }
        }

        public String toString() {
            return "ShardConsumer{shardOffset=" + this.shardOffset + ", state=" + (Object)((Object)this.state) + '}';
        }
    }

    private final class ConsumerDispatcher
    implements SchedulingAwareRunnable {
        private final Set<String> inReshardingProcess = new HashSet<String>();

        private ConsumerDispatcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            while (KinesisMessageDrivenChannelAdapter.this.active) {
                for (String stream : KinesisMessageDrivenChannelAdapter.this.inResharding) {
                    if (!this.inReshardingProcess.add(stream)) continue;
                    if (KinesisMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                        KinesisMessageDrivenChannelAdapter.this.logger.debug((Object)("Resharding has happened for stream [" + stream + "]. Rebalancing..."));
                    }
                    KinesisMessageDrivenChannelAdapter.this.populateShardsForStream(stream, null);
                }
                Iterator iterator = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator();
                while (iterator.hasNext()) {
                    ShardConsumer shardConsumer = (ShardConsumer)iterator.next();
                    shardConsumer.execute();
                    if (ConsumerState.STOP != shardConsumer.state) continue;
                    iterator.remove();
                    if (KinesisMessageDrivenChannelAdapter.this.streams == null || shardConsumer.shardIterator != null) continue;
                    KinesisShardOffset shardOffset = shardConsumer.shardOffset;
                    String stream = shardOffset.getStream();
                    if (!KinesisMessageDrivenChannelAdapter.this.inResharding.add(stream)) continue;
                    this.inReshardingProcess.remove(stream);
                    Set set = KinesisMessageDrivenChannelAdapter.this.shardOffsets;
                    synchronized (set) {
                        KinesisMessageDrivenChannelAdapter.this.shardOffsets.remove(shardOffset);
                    }
                }
                try {
                    Thread.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("ConsumerDispatcher Thread [" + this + "] has been interrupted", e);
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }
}

