/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import java.util.List;
import java.util.ListIterator;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
public class ProcessTask
implements ConsumerTask {
    private static final Logger log = LoggerFactory.getLogger(ProcessTask.class);
    private static final String PROCESS_TASK_OPERATION = "ProcessTask";
    private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
    private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
    private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
    private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
    private final ShardInfo shardInfo;
    private final ShardRecordProcessor shardRecordProcessor;
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
    private final TaskType taskType = TaskType.PROCESS;
    private final long backoffTimeMillis;
    private final Shard shard;
    private final ThrottlingReporter throttlingReporter;
    private final boolean shouldCallProcessRecordsEvenForEmptyRecordList;
    private final long idleTimeInMilliseconds;
    private final ProcessRecordsInput processRecordsInput;
    private final MetricsFactory metricsFactory;
    private final AggregatorUtil aggregatorUtil;

    public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardDetector shardDetector, @NonNull ThrottlingReporter throttlingReporter, ProcessRecordsInput processRecordsInput, boolean shouldCallProcessRecordsEvenForEmptyRecordList, long idleTimeInMilliseconds, @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor");
        }
        if (recordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer");
        }
        if (throttlingReporter == null) {
            throw new NullPointerException("throttlingReporter");
        }
        if (aggregatorUtil == null) {
            throw new NullPointerException("aggregatorUtil");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        this.shardInfo = shardInfo;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.backoffTimeMillis = backoffTimeMillis;
        this.throttlingReporter = throttlingReporter;
        this.processRecordsInput = processRecordsInput;
        this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
        this.idleTimeInMilliseconds = idleTimeInMilliseconds;
        this.metricsFactory = metricsFactory;
        this.shard = !skipShardSyncAtWorkerInitializationIfLeasesExist ? shardDetector.shard(shardInfo.shardId()) : null;
        if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) {
            log.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records in the event of resharding will not be dropped during deaggregation of Amazon Kinesis records.");
        }
        this.aggregatorUtil = aggregatorUtil;
        this.recordProcessorCheckpointer.checkpointer().operation(PROCESS_TASK_OPERATION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskResult call() {
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, PROCESS_TASK_OPERATION);
        MetricsUtil.addShardId(scope, this.shardInfo.shardId());
        long startTimeMillis = System.currentTimeMillis();
        boolean success = false;
        try {
            TaskResult taskResult;
            RuntimeException exception;
            block12: {
                scope.addData(RECORDS_PROCESSED_METRIC, 0.0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                scope.addData(DATA_BYTES_PROCESSED_METRIC, 0.0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
                exception = null;
                if (this.processRecordsInput.millisBehindLatest() != null) {
                    scope.addData(MILLIS_BEHIND_LATEST_METRIC, this.processRecordsInput.millisBehindLatest().longValue(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
                }
                if (!this.processRecordsInput.isAtShardEnd() || !this.processRecordsInput.records().isEmpty()) break block12;
                log.info("Reached end of shard {} and have no records to process", (Object)this.shardInfo.shardId());
                TaskResult taskResult2 = new TaskResult(null, true);
                return taskResult2;
            }
            try {
                this.throttlingReporter.success();
                List<KinesisClientRecord> records = this.deaggregateAnyKplRecords(this.processRecordsInput.records());
                if (!records.isEmpty()) {
                    scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
                }
                this.recordProcessorCheckpointer.largestPermittedCheckpointValue(this.filterAndGetMaxExtendedSequenceNumber(scope, records, this.recordProcessorCheckpointer.lastCheckpointValue(), this.recordProcessorCheckpointer.largestPermittedCheckpointValue()));
                if (this.shouldCallProcessRecords(records)) {
                    this.callProcessRecords(this.processRecordsInput, records);
                }
                success = true;
            }
            catch (RuntimeException e) {
                log.error("ShardId {}: Caught exception: ", (Object)this.shardInfo.shardId(), (Object)e);
                exception = e;
                this.backoff();
            }
            if (this.processRecordsInput.isAtShardEnd()) {
                log.info("Reached end of shard {}, and processed {} records", (Object)this.shardInfo.shardId(), (Object)this.processRecordsInput.records().size());
                taskResult = new TaskResult(null, true);
                return taskResult;
            }
            taskResult = new TaskResult(exception);
            return taskResult;
        }
        finally {
            MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(scope);
        }
    }

    private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> records) {
        if (this.shard == null) {
            return this.aggregatorUtil.deaggregate(records);
        }
        return this.aggregatorUtil.deaggregate(records, this.shard.hashKeyRange().startingHashKey(), this.shard.hashKeyRange().endingHashKey());
    }

    private void backoff() {
        try {
            Thread.sleep(this.backoffTimeMillis);
        }
        catch (InterruptedException ie) {
            log.debug("{}: Sleep was interrupted", (Object)this.shardInfo.shardId(), (Object)ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRecord> records) {
        log.debug("Calling application processRecords() with {} records from {}", (Object)records.size(), (Object)this.shardInfo.shardId());
        ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()).checkpointer(this.recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, PROCESS_TASK_OPERATION);
        MetricsUtil.addShardId(scope, this.shardInfo.shardId());
        long startTime = System.currentTimeMillis();
        try {
            this.shardRecordProcessor.processRecords(processRecordsInput);
        }
        catch (Exception e) {
            log.error("ShardId {}: Application processRecords() threw an exception when processing shard ", (Object)this.shardInfo.shardId(), (Object)e);
            log.error("ShardId {}: Skipping over the following data records: {}", (Object)this.shardInfo.shardId(), records);
        }
        finally {
            MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(scope);
        }
    }

    private boolean shouldCallProcessRecords(List<KinesisClientRecord> records) {
        return !records.isEmpty() || this.shouldCallProcessRecordsEvenForEmptyRecordList;
    }

    private void handleNoRecords(long startTimeMillis) {
        log.debug("Kinesis didn't return any records for shard {}", (Object)this.shardInfo.shardId());
        long sleepTimeMillis = this.idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
        if (sleepTimeMillis > 0L) {
            sleepTimeMillis = Math.max(sleepTimeMillis, this.idleTimeInMilliseconds);
            try {
                log.debug("Sleeping for {} ms since there were no new records in shard {}", (Object)sleepTimeMillis, (Object)this.shardInfo.shardId());
                Thread.sleep(sleepTimeMillis);
            }
            catch (InterruptedException e) {
                log.debug("ShardId {}: Sleep was interrupted", (Object)this.shardInfo.shardId());
            }
        }
    }

    @Override
    public TaskType taskType() {
        return this.taskType;
    }

    private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(MetricsScope scope, List<KinesisClientRecord> records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
        ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue;
        ListIterator<KinesisClientRecord> recordIterator = records.listIterator();
        while (recordIterator.hasNext()) {
            KinesisClientRecord record = recordIterator.next();
            ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(record.sequenceNumber(), record.subSequenceNumber());
            if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
                recordIterator.remove();
                log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", (Object)extendedSequenceNumber, (Object)lastCheckpointValue);
                continue;
            }
            if (largestExtendedSequenceNumber == null || largestExtendedSequenceNumber.compareTo(extendedSequenceNumber) < 0) {
                largestExtendedSequenceNumber = extendedSequenceNumber;
            }
            scope.addData(DATA_BYTES_PROCESSED_METRIC, record.data().limit(), StandardUnit.BYTES, MetricsLevel.SUMMARY);
        }
        return largestExtendedSequenceNumber;
    }
}

