/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
public class ShutdownTask
implements ConsumerTask {
    private static final Logger log = LoggerFactory.getLogger(ShutdownTask.class);
    private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
    @VisibleForTesting
    static final int RETRY_RANDOM_MAX_RANGE = 30;
    @NonNull
    private final ShardInfo shardInfo;
    @NonNull
    private final ShardDetector shardDetector;
    @NonNull
    private final ShardRecordProcessor shardRecordProcessor;
    @NonNull
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
    @NonNull
    private final ShutdownReason reason;
    @NonNull
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;
    @NonNull
    private final LeaseCoordinator leaseCoordinator;
    private final long backoffTimeMillis;
    @NonNull
    private final RecordsPublisher recordsPublisher;
    @NonNull
    private final HierarchicalShardSyncer hierarchicalShardSyncer;
    @NonNull
    private final MetricsFactory metricsFactory;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final List<ChildShard> childShards;
    @NonNull
    private final StreamIdentifier streamIdentifier;
    @NonNull
    private final LeaseCleanupManager leaseCleanupManager;
    private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskResult call() {
        Exception exception;
        this.recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, SHUTDOWN_TASK_OPERATION);
        try {
            log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", new Object[]{leaseKeyProvider.apply(this.shardInfo), this.childShards, this.shardInfo.concurrencyToken(), this.reason});
            long startTime = System.currentTimeMillis();
            Lease currentShardLease = this.leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(this.shardInfo));
            Runnable leaseLostAction = () -> this.shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
            if (this.reason == ShutdownReason.SHARD_END) {
                try {
                    this.takeShardEndAction(currentShardLease, scope, startTime);
                }
                catch (InvalidStateException e) {
                    log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", (Object)leaseKeyProvider.apply(this.shardInfo), (Object)e);
                    this.dropLease(currentShardLease);
                    this.throwOnApplicationException(leaseLostAction, scope, startTime);
                }
            } else {
                this.throwOnApplicationException(leaseLostAction, scope, startTime);
            }
            log.debug("Shutting down retrieval strategy for shard {}.", (Object)leaseKeyProvider.apply(this.shardInfo));
            this.recordsPublisher.shutdown();
            log.debug("Record processor completed shutdown() for shard {}", (Object)leaseKeyProvider.apply(this.shardInfo));
            TaskResult taskResult = new TaskResult(null);
            return taskResult;
        }
        catch (Exception e) {
            if (e instanceof CustomerApplicationException) {
                log.error("Shard {}: Application exception. ", (Object)leaseKeyProvider.apply(this.shardInfo), (Object)e);
            } else {
                log.error("Shard {}: Caught exception: ", (Object)leaseKeyProvider.apply(this.shardInfo), (Object)e);
            }
            exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                log.debug("Shard {}: Interrupted sleep", (Object)leaseKeyProvider.apply(this.shardInfo), (Object)ie);
            }
        }
        finally {
            MetricsUtil.endScope(scope);
        }
        return new TaskResult(exception);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void takeShardEndAction(Lease currentShardLease, MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        LeasePendingDeletion leasePendingDeletion;
        if (currentShardLease == null) {
            throw new InvalidStateException(leaseKeyProvider.apply(this.shardInfo) + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
        }
        if (!CollectionUtils.isNullOrEmpty(this.childShards)) {
            this.createLeasesForChildShardsIfNotExist();
            this.updateLeaseWithChildShards(currentShardLease);
        }
        if (!this.leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion = new LeasePendingDeletion(this.streamIdentifier, currentShardLease, this.shardInfo, this.shardDetector))) {
            boolean isSuccess = false;
            try {
                isSuccess = this.attemptShardEndCheckpointing(scope, startTime);
            }
            finally {
                if (isSuccess || CollectionUtils.isNullOrEmpty(this.childShards)) {
                    this.leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
                }
            }
        }
    }

    private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        Lease leaseFromDdb = Optional.ofNullable(this.leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(this.shardInfo))).orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(this.shardInfo) + " does not exist."));
        if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
            this.throwOnApplicationException(() -> this.applicationCheckpointAndVerification(), scope, startTime);
        }
        return true;
    }

    private void applicationCheckpointAndVerification() {
        this.recordProcessorCheckpointer.sequenceNumberAtShardEnd(this.recordProcessorCheckpointer.largestPermittedCheckpointValue());
        this.recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
        this.shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(this.recordProcessorCheckpointer).build());
        ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.lastCheckpointValue();
        if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
            throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + leaseKeyProvider.apply(this.shardInfo) + ". Application must checkpoint upon shard end. See ShardRecordProcessor.shardEnded javadocs for more information.");
        }
    }

    private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, long startTime) throws CustomerApplicationException {
        try {
            action.run();
        }
        catch (Exception e) {
            throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(this.shardInfo) + ": ", e);
        }
        finally {
            MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
        }
    }

    private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        if (!CollectionUtils.isNullOrEmpty(this.childShards) && this.childShards.size() == 1) {
            boolean isValidLeaseTableState;
            ChildShard childShard = this.childShards.get(0);
            List parentLeaseKeys = childShard.parentShards().stream().map(parentShardId -> ShardInfo.getLeaseKey(this.shardInfo, parentShardId)).collect(Collectors.toList());
            if (parentLeaseKeys.size() != 2) {
                throw new InvalidStateException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " does not contain other parent information.");
            }
            boolean bl = isValidLeaseTableState = Objects.isNull(this.leaseCoordinator.leaseRefresher().getLease((String)parentLeaseKeys.get(0))) == Objects.isNull(this.leaseCoordinator.leaseRefresher().getLease((String)parentLeaseKeys.get(1)));
            if (!isValidLeaseTableState) {
                if (!this.isOneInNProbability(30)) {
                    throw new BlockedOnParentShardException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
                }
                throw new InvalidStateException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
            }
        }
        for (ChildShard childShard : this.childShards) {
            String leaseKey = ShardInfo.getLeaseKey(this.shardInfo, childShard.shardId());
            if (this.leaseCoordinator.leaseRefresher().getLease(leaseKey) != null) continue;
            Lease leaseToCreate = this.hierarchicalShardSyncer.createLeaseForChildShard(childShard, this.shardDetector.streamIdentifier());
            this.leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
            log.info("Shard {}: Created child shard lease: {}", (Object)this.shardInfo.shardId(), (Object)leaseToCreate.leaseKey());
        }
    }

    @VisibleForTesting
    boolean isOneInNProbability(int n) {
        Random r = new Random();
        return 1 == r.nextInt(n - 1 + 1) + 1;
    }

    private void updateLeaseWithChildShards(Lease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        Set<String> childShardIds = this.childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
        Lease updatedLease = currentLease.copy();
        updatedLease.childShardIds(childShardIds);
        this.leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
        log.info("Shard {}: Updated current lease {} with child shard information: {}", new Object[]{this.shardInfo.shardId(), currentLease.leaseKey(), childShardIds});
    }

    @Override
    public TaskType taskType() {
        return this.taskType;
    }

    @VisibleForTesting
    public ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease(Lease currentLease) {
        if (currentLease == null) {
            log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", (Object)leaseKeyProvider.apply(this.shardInfo));
            return;
        }
        this.leaseCoordinator.dropLease(currentLease);
        log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
    }

    public ShutdownTask(@NonNull ShardInfo shardInfo, @NonNull ShardDetector shardDetector, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer recordProcessorCheckpointer, @NonNull ShutdownReason reason, @NonNull InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, @NonNull LeaseCoordinator leaseCoordinator, long backoffTimeMillis, @NonNull RecordsPublisher recordsPublisher, @NonNull HierarchicalShardSyncer hierarchicalShardSyncer, @NonNull MetricsFactory metricsFactory, List<ChildShard> childShards, @NonNull StreamIdentifier streamIdentifier, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardDetector == null) {
            throw new NullPointerException("shardDetector");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor");
        }
        if (recordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer");
        }
        if (reason == null) {
            throw new NullPointerException("reason");
        }
        if (initialPositionInStream == null) {
            throw new NullPointerException("initialPositionInStream");
        }
        if (leaseCoordinator == null) {
            throw new NullPointerException("leaseCoordinator");
        }
        if (recordsPublisher == null) {
            throw new NullPointerException("recordsPublisher");
        }
        if (hierarchicalShardSyncer == null) {
            throw new NullPointerException("hierarchicalShardSyncer");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager");
        }
        this.shardInfo = shardInfo;
        this.shardDetector = shardDetector;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = reason;
        this.initialPositionInStream = initialPositionInStream;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
        this.leaseCoordinator = leaseCoordinator;
        this.backoffTimeMillis = backoffTimeMillis;
        this.recordsPublisher = recordsPublisher;
        this.hierarchicalShardSyncer = hierarchicalShardSyncer;
        this.metricsFactory = metricsFactory;
        this.childShards = childShards;
        this.streamIdentifier = streamIdentifier;
        this.leaseCleanupManager = leaseCleanupManager;
    }
}

