/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.aws2.kinesis;

import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
import org.apache.camel.component.aws2.kinesis.ReachedClosedStatusException;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class Kinesis2Consumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
    private String currentShardIterator;
    private boolean isShardClosed;

    public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
    }

    protected int poll() throws Exception {
        String shardIterator = this.getShardIterator();
        if (shardIterator == null) {
            return 0;
        }
        GetRecordsRequest req = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).limit(Integer.valueOf(this.getEndpoint().getConfiguration().getMaxResultsPerRequest())).build();
        GetRecordsResponse result = this.getClient().getRecords(req);
        Queue<Exchange> exchanges = this.createExchanges(result.records());
        int processedExchangeCount = this.processBatch(CastUtils.cast(exchanges));
        this.currentShardIterator = result.nextShardIterator();
        if (this.isShardClosed) {
            switch (this.getEndpoint().getConfiguration().getShardClosed()) {
                case ignore: {
                    LOG.warn("The shard {} is in closed state", (Object)this.currentShardIterator);
                    break;
                }
                case silent: {
                    break;
                }
                case fail: {
                    LOG.info("Shard Iterator reaches CLOSE status:{} {}", (Object)this.getEndpoint().getConfiguration().getStreamName(), (Object)this.getEndpoint().getConfiguration().getShardId());
                    throw new ReachedClosedStatusException(this.getEndpoint().getConfiguration().getStreamName(), this.getEndpoint().getConfiguration().getShardId());
                }
                default: {
                    throw new IllegalArgumentException("Unsupported shard closed strategy");
                }
            }
        }
        return processedExchangeCount;
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int processedExchanges = 0;
        while (!exchanges.isEmpty()) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
            this.getAsyncProcessor().process(exchange, cb);
            ++processedExchanges;
        }
        return processedExchanges;
    }

    private KinesisClient getClient() {
        return this.getEndpoint().getClient();
    }

    public Kinesis2Endpoint getEndpoint() {
        return (Kinesis2Endpoint)super.getEndpoint();
    }

    private String getShardIterator() {
        if (this.currentShardIterator == null) {
            DescribeStreamResponse res1;
            DescribeStreamRequest req1;
            String shardId;
            if (!this.getEndpoint().getConfiguration().getShardId().isEmpty()) {
                shardId = this.getEndpoint().getConfiguration().getShardId();
                req1 = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.getEndpoint().getConfiguration().getStreamName()).build();
                res1 = this.getClient().describeStream(req1);
                for (Shard shard : res1.streamDescription().shards()) {
                    if (!shard.shardId().equalsIgnoreCase(this.getEndpoint().getConfiguration().getShardId())) continue;
                    this.isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
                }
            } else {
                req1 = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(this.getEndpoint().getConfiguration().getStreamName()).build();
                res1 = this.getClient().describeStream(req1);
                List shards = res1.streamDescription().shards();
                if (shards.isEmpty()) {
                    LOG.warn("There are no shards in the stream");
                    return null;
                }
                shardId = ((Shard)shards.get(0)).shardId();
                this.isShardClosed = ((Shard)shards.get(0)).sequenceNumberRange().endingSequenceNumber() != null;
            }
            LOG.debug("ShardId is: {}", (Object)shardId);
            GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder().streamName(this.getEndpoint().getConfiguration().getStreamName()).shardId(shardId).shardIteratorType(this.getEndpoint().getConfiguration().getIteratorType());
            this.resume(req);
            GetShardIteratorResponse result = this.getClient().getShardIterator((GetShardIteratorRequest)req.build());
            this.currentShardIterator = result.shardIterator();
        }
        LOG.debug("Shard Iterator is: {}", (Object)this.currentShardIterator);
        return this.currentShardIterator;
    }

    private void resume(GetShardIteratorRequest.Builder req) {
        KinesisResumeStrategy resumeStrategy = this.getEndpoint().getConfiguration().getResumeStrategy() == null ? new KinesisUserConfigurationResumeStrategy(this.getEndpoint().getConfiguration()) : this.getEndpoint().getConfiguration().getResumeStrategy();
        resumeStrategy.resume(req);
    }

    private Queue<Exchange> createExchanges(List<Record> records) {
        ArrayDeque<Exchange> exchanges = new ArrayDeque<Exchange>();
        for (Record record : records) {
            exchanges.add(this.createExchange(record));
        }
        return exchanges;
    }

    protected Exchange createExchange(Record record) {
        Exchange exchange = this.createExchange(true);
        exchange.getIn().setBody((Object)record);
        exchange.getIn().setHeader("CamelAwsKinesisApproximateArrivalTimestamp", (Object)record.approximateArrivalTimestamp());
        exchange.getIn().setHeader("CamelAwsKinesisPartitionKey", (Object)record.partitionKey());
        exchange.getIn().setHeader("CamelAwsKinesisSequenceNumber", (Object)record.sequenceNumber());
        if (record.approximateArrivalTimestamp() != null) {
            long ts = record.approximateArrivalTimestamp().getEpochSecond() * 1000L;
            exchange.getIn().setHeader("CamelMessageTimestamp", (Object)ts);
        }
        return exchange;
    }
}

