/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
public class InitializeTask
implements ConsumerTask {
    private static final Logger log = LoggerFactory.getLogger(InitializeTask.class);
    private static final String INITIALIZE_TASK_OPERATION = "InitializeTask";
    private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize";
    @NonNull
    private final ShardInfo shardInfo;
    @NonNull
    private final ShardRecordProcessor shardRecordProcessor;
    @NonNull
    private final Checkpointer checkpoint;
    @NonNull
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
    @NonNull
    private final InitialPositionInStreamExtended initialPositionInStream;
    @NonNull
    private final RecordsPublisher cache;
    private final long backoffTimeMillis;
    @NonNull
    private final MetricsFactory metricsFactory;
    private final TaskType taskType = TaskType.INITIALIZE;

    @Override
    public TaskResult call() {
        boolean applicationException = false;
        Exception exception = null;
        try {
            log.debug("Initializing ShardId {}", (Object)this.shardInfo);
            Checkpoint initialCheckpointObject = this.checkpoint.getCheckpointObject(this.shardInfo.shardId());
            ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
            log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", new Object[]{this.shardInfo.shardId(), initialCheckpoint, this.initialPositionInStream});
            this.cache.start(initialCheckpoint, this.initialPositionInStream);
            this.recordProcessorCheckpointer.largestPermittedCheckpointValue(initialCheckpoint);
            this.recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
            log.debug("Calling the record processor initialize().");
            InitializationInput initializationInput = InitializationInput.builder().shardId(this.shardInfo.shardId()).extendedSequenceNumber(initialCheckpoint).pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint()).build();
            MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, INITIALIZE_TASK_OPERATION);
            long startTime = System.currentTimeMillis();
            try {
                this.shardRecordProcessor.initialize(initializationInput);
                log.debug("Record processor initialize() completed.");
            }
            catch (Exception e) {
                applicationException = true;
                throw e;
            }
            finally {
                MetricsUtil.addLatency(scope, RECORD_PROCESSOR_INITIALIZE_METRIC, startTime, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(scope);
            }
            return new TaskResult(null);
        }
        catch (Exception e) {
            if (applicationException) {
                log.error("Application initialize() threw exception: ", (Throwable)e);
            } else {
                log.error("Caught exception: ", (Throwable)e);
            }
            exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                log.debug("Interrupted sleep", (Throwable)ie);
            }
            return new TaskResult(exception);
        }
    }

    @Override
    public TaskType taskType() {
        return this.taskType;
    }

    public InitializeTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull Checkpointer checkpoint, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, @NonNull InitialPositionInStreamExtended initialPositionInStream, @NonNull RecordsPublisher cache, long backoffTimeMillis, @NonNull MetricsFactory metricsFactory) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor");
        }
        if (checkpoint == null) {
            throw new NullPointerException("checkpoint");
        }
        if (recordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer");
        }
        if (initialPositionInStream == null) {
            throw new NullPointerException("initialPositionInStream");
        }
        if (cache == null) {
            throw new NullPointerException("cache");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        this.shardInfo = shardInfo;
        this.shardRecordProcessor = shardRecordProcessor;
        this.checkpoint = checkpoint;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.initialPositionInStream = initialPositionInStream;
        this.cache = cache;
        this.backoffTimeMillis = backoffTimeMillis;
        this.metricsFactory = metricsFactory;
    }
}

