/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.aws2.ddbstream;

import java.math.BigInteger;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.aws2.ddbstream.BigIntComparisons;
import org.apache.camel.component.aws2.ddbstream.Ddb2StreamEndpoint;
import org.apache.camel.component.aws2.ddbstream.ShardIteratorHandler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

public class Ddb2StreamConsumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Ddb2StreamConsumer.class);
    private final ShardIteratorHandler shardIteratorHandler;
    private String lastSeenSequenceNumber;

    public Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor) {
        this(endpoint, processor, new ShardIteratorHandler(endpoint));
    }

    Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor, ShardIteratorHandler shardIteratorHandler) {
        super((Endpoint)endpoint, processor);
        this.shardIteratorHandler = shardIteratorHandler;
    }

    protected int poll() throws Exception {
        GetRecordsResponse result;
        try {
            GetRecordsRequest.Builder req = GetRecordsRequest.builder().shardIterator(this.shardIteratorHandler.getShardIterator(null)).limit(Integer.valueOf(this.getEndpoint().getConfiguration().getMaxResultsPerRequest()));
            result = this.getClient().getRecords((GetRecordsRequest)req.build());
        }
        catch (ExpiredIteratorException e) {
            LOG.warn("Expired Shard Iterator, attempting to resume from {}", (Object)this.lastSeenSequenceNumber, (Object)e);
            GetRecordsRequest.Builder req = GetRecordsRequest.builder().shardIterator(this.shardIteratorHandler.getShardIterator(this.lastSeenSequenceNumber)).limit(Integer.valueOf(this.getEndpoint().getConfiguration().getMaxResultsPerRequest()));
            result = this.getClient().getRecords((GetRecordsRequest)req.build());
        }
        List records = result.records();
        Queue<Exchange> exchanges = this.createExchanges(records, this.lastSeenSequenceNumber);
        int processedExchangeCount = this.processBatch(CastUtils.cast(exchanges));
        this.shardIteratorHandler.updateShardIterator(result.nextShardIterator());
        if (!records.isEmpty()) {
            this.lastSeenSequenceNumber = ((Record)records.get(records.size() - 1)).dynamodb().sequenceNumber();
        }
        return processedExchangeCount;
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int processedExchanges = 0;
        while (!exchanges.isEmpty()) {
            final Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            LOG.trace("Processing exchange [{}] started.", (Object)exchange);
            this.getAsyncProcessor().process(exchange, new AsyncCallback(){

                public void done(boolean doneSync) {
                    LOG.trace("Processing exchange [{}] done.", (Object)exchange);
                }
            });
            ++processedExchanges;
        }
        return processedExchanges;
    }

    private DynamoDbStreamsClient getClient() {
        return this.getEndpoint().getClient();
    }

    public Ddb2StreamEndpoint getEndpoint() {
        return (Ddb2StreamEndpoint)super.getEndpoint();
    }

    private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) {
        ArrayDeque<Exchange> exchanges = new ArrayDeque<Exchange>();
        BigIntComparisons.Conditions condition = null;
        BigInteger providedSeqNum = null;
        if (lastSeenSequenceNumber != null) {
            providedSeqNum = new BigInteger(lastSeenSequenceNumber);
            condition = BigIntComparisons.Conditions.LT;
        }
        switch (this.getEndpoint().getConfiguration().getIteratorType()) {
            case AFTER_SEQUENCE_NUMBER: {
                condition = BigIntComparisons.Conditions.LT;
                providedSeqNum = new BigInteger(this.getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
                break;
            }
            case AT_SEQUENCE_NUMBER: {
                condition = BigIntComparisons.Conditions.LTEQ;
                providedSeqNum = new BigInteger(this.getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
                break;
            }
        }
        for (Record record : records) {
            BigInteger recordSeqNum = new BigInteger(record.dynamodb().sequenceNumber());
            if (condition != null && !condition.matches(providedSeqNum, recordSeqNum)) continue;
            exchanges.add(this.getEndpoint().createExchange(record));
        }
        return exchanges;
    }
}

