/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class ShutdownTask
implements ITask {
    private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ShutdownReason reason;
    private final IKinesisProxy kinesisProxy;
    private final ILeaseManager<KinesisClientLease> leaseManager;
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final long backoffTimeMillis;
    private final GetRecordsCache getRecordsCache;
    private final ShardSyncer shardSyncer;
    private final ShardSyncStrategy shardSyncStrategy;

    ShutdownTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason reason, IKinesisProxy kinesisProxy, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, ILeaseManager<KinesisClientLease> leaseManager, long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = reason;
        this.kinesisProxy = kinesisProxy;
        this.initialPositionInStream = initialPositionInStream;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
        this.leaseManager = leaseManager;
        this.backoffTimeMillis = backoffTimeMillis;
        this.getRecordsCache = getRecordsCache;
        this.shardSyncer = shardSyncer;
        this.shardSyncStrategy = shardSyncStrategy;
    }

    @Override
    public TaskResult call() {
        boolean applicationException = false;
        try {
            if (this.reason == ShutdownReason.TERMINATE) {
                this.recordProcessorCheckpointer.setSequenceNumberAtShardEnd(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
                this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
            }
            LOG.debug((Object)("Invoking shutdown() for shard " + this.shardInfo.getShardId() + ", concurrencyToken " + this.shardInfo.getConcurrencyToken() + ". Shutdown reason: " + (Object)((Object)this.reason)));
            ShutdownInput shutdownInput = new ShutdownInput().withShutdownReason(this.reason).withCheckpointer(this.recordProcessorCheckpointer);
            long recordProcessorStartTimeMillis = System.currentTimeMillis();
            try {
                this.recordProcessor.shutdown(shutdownInput);
                ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.getLastCheckpointValue();
                if (!(this.reason != ShutdownReason.TERMINATE || lastCheckpointValue != null && lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
                    throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + this.shardInfo.getShardId() + ". Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.");
                }
                LOG.debug((Object)"Shutting down retrieval strategy.");
                this.getRecordsCache.shutdown();
                LOG.debug((Object)("Record processor completed shutdown() for shard " + this.shardInfo.getShardId()));
            }
            catch (Exception e) {
                applicationException = true;
                throw e;
            }
            finally {
                MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
            }
            if (this.reason == ShutdownReason.TERMINATE) {
                LOG.debug((Object)("Looking for child shards of shard " + this.shardInfo.getShardId()));
                TaskResult result = this.shardSyncStrategy.onShardConsumerShutDown();
                if (result.getException() != null) {
                    LOG.debug((Object)("Exception while trying to sync shards on the shutdown of shard: " + this.shardInfo.getShardId()));
                    throw result.getException();
                }
                LOG.debug((Object)("Finished checking for child shards of shard " + this.shardInfo.getShardId()));
            }
            return new TaskResult(null);
        }
        catch (Exception e) {
            if (applicationException) {
                LOG.error((Object)"Application exception. ", (Throwable)e);
            } else {
                LOG.error((Object)"Caught exception: ", (Throwable)e);
            }
            Exception exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                LOG.debug((Object)"Interrupted sleep", (Throwable)ie);
            }
            return new TaskResult(exception);
        }
    }

    @Override
    public TaskType getTaskType() {
        return this.taskType;
    }

    @VisibleForTesting
    ShutdownReason getReason() {
        return this.reason;
    }
}

