/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.reactivex.plugins.RxJavaPlugins;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.DeterministicShuffleShardSyncLeaderDecider;
import software.amazon.kinesis.coordinator.DiagnosticEventFactory;
import software.amazon.kinesis.coordinator.DiagnosticEventHandler;
import software.amazon.kinesis.coordinator.DiagnosticEventLogger;
import software.amazon.kinesis.coordinator.ExecutorStateEvent;
import software.amazon.kinesis.coordinator.GracefulShutdownContext;
import software.amazon.kinesis.coordinator.GracefulShutdownCoordinator;
import software.amazon.kinesis.coordinator.LeaderDecider;
import software.amazon.kinesis.coordinator.PeriodicShardSyncManager;
import software.amazon.kinesis.coordinator.RejectedTaskEvent;
import software.amazon.kinesis.coordinator.SchedulerCoordinatorFactory;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
import software.amazon.kinesis.lifecycle.ShardConsumerShutdownNotification;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.metrics.CloudWatchMetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class Scheduler
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Scheduler.class);
    private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
    private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3000L;
    private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
    private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30000L;
    private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 60000L;
    private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
    private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
    private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
    private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
    private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
    private SchedulerLog slog = new SchedulerLog();
    private final CheckpointConfig checkpointConfig;
    private final CoordinatorConfig coordinatorConfig;
    private final LeaseManagementConfig leaseManagementConfig;
    private final LifecycleConfig lifecycleConfig;
    private final MetricsConfig metricsConfig;
    private final ProcessorConfig processorConfig;
    private final RetrievalConfig retrievalConfig;
    private final String applicationName;
    private final int maxInitializationAttempts;
    private final Checkpointer checkpoint;
    private final long shardConsumerDispatchPollIntervalMillis;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private final DiagnosticEventFactory diagnosticEventFactory;
    private final DiagnosticEventHandler diagnosticEventHandler;
    private final LeaseCoordinator leaseCoordinator;
    private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
    private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<StreamConfig, ShardSyncTaskManager>();
    private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
    private final ShardPrioritization shardPrioritization;
    private final boolean cleanupLeasesUponShardCompletion;
    private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
    private final GracefulShutdownCoordinator gracefulShutdownCoordinator;
    private final WorkerStateChangeListener workerStateChangeListener;
    private final MetricsFactory metricsFactory;
    private final long failoverTimeMillis;
    private final long taskBackoffTimeMillis;
    private final boolean isMultiStreamMode;
    private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
    private MultiStreamTracker multiStreamTracker;
    private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
    private final long listShardsBackoffTimeMillis;
    private final int maxListShardsRetryAttempts;
    private final LeaseRefresher leaseRefresher;
    private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
    private final boolean ignoreUnexpetedChildShards;
    private final AggregatorUtil aggregatorUtil;
    private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
    private final long schedulerInitializationBackoffTimeMillis;
    private final LeaderDecider leaderDecider;
    private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<StreamIdentifier, Instant>();
    private final LeaseCleanupManager leaseCleanupManager;
    private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
    private volatile boolean shutdown;
    private volatile long shutdownStartTimeMillis;
    private volatile boolean shutdownComplete = false;
    private final Object lock = new Object();
    private Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
    private boolean leasesSyncedOnAppInit = false;
    private Future<Boolean> gracefulShutdownFuture;
    @VisibleForTesting
    protected boolean gracefuleShutdownStarted = false;

    public Scheduler(@NonNull CheckpointConfig checkpointConfig, @NonNull CoordinatorConfig coordinatorConfig, @NonNull LeaseManagementConfig leaseManagementConfig, @NonNull LifecycleConfig lifecycleConfig, @NonNull MetricsConfig metricsConfig, @NonNull ProcessorConfig processorConfig, @NonNull RetrievalConfig retrievalConfig) {
        this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, new DiagnosticEventFactory());
        if (checkpointConfig == null) {
            throw new NullPointerException("checkpointConfig");
        }
        if (coordinatorConfig == null) {
            throw new NullPointerException("coordinatorConfig");
        }
        if (leaseManagementConfig == null) {
            throw new NullPointerException("leaseManagementConfig");
        }
        if (lifecycleConfig == null) {
            throw new NullPointerException("lifecycleConfig");
        }
        if (metricsConfig == null) {
            throw new NullPointerException("metricsConfig");
        }
        if (processorConfig == null) {
            throw new NullPointerException("processorConfig");
        }
        if (retrievalConfig == null) {
            throw new NullPointerException("retrievalConfig");
        }
    }

    @VisibleForTesting
    protected Scheduler(@NonNull CheckpointConfig checkpointConfig, @NonNull CoordinatorConfig coordinatorConfig, @NonNull LeaseManagementConfig leaseManagementConfig, @NonNull LifecycleConfig lifecycleConfig, @NonNull MetricsConfig metricsConfig, @NonNull ProcessorConfig processorConfig, @NonNull RetrievalConfig retrievalConfig, @NonNull DiagnosticEventFactory diagnosticEventFactory) {
        if (checkpointConfig == null) {
            throw new NullPointerException("checkpointConfig");
        }
        if (coordinatorConfig == null) {
            throw new NullPointerException("coordinatorConfig");
        }
        if (leaseManagementConfig == null) {
            throw new NullPointerException("leaseManagementConfig");
        }
        if (lifecycleConfig == null) {
            throw new NullPointerException("lifecycleConfig");
        }
        if (metricsConfig == null) {
            throw new NullPointerException("metricsConfig");
        }
        if (processorConfig == null) {
            throw new NullPointerException("processorConfig");
        }
        if (retrievalConfig == null) {
            throw new NullPointerException("retrievalConfig");
        }
        if (diagnosticEventFactory == null) {
            throw new NullPointerException("diagnosticEventFactory");
        }
        this.checkpointConfig = checkpointConfig;
        this.coordinatorConfig = coordinatorConfig;
        this.leaseManagementConfig = leaseManagementConfig;
        this.lifecycleConfig = lifecycleConfig;
        this.metricsConfig = metricsConfig;
        this.processorConfig = processorConfig;
        this.retrievalConfig = retrievalConfig;
        this.applicationName = this.coordinatorConfig.applicationName();
        this.isMultiStreamMode = (Boolean)this.retrievalConfig.appStreamTracker().map(multiStreamTracker -> true, streamConfig -> false);
        this.currentStreamConfigMap = (Map)this.retrievalConfig.appStreamTracker().map(multiStreamTracker -> {
            this.multiStreamTracker = multiStreamTracker;
            this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
            return multiStreamTracker.streamConfigList().stream().collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
        }, streamConfig -> Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
        this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
        this.metricsFactory = this.metricsConfig.metricsFactory();
        DynamoDBLeaseSerializer leaseSerializer = this.isMultiStreamMode ? new DynamoDBMultiStreamLeaseSerializer() : new DynamoDBLeaseSerializer();
        this.leaseCoordinator = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, this.isMultiStreamMode).createLeaseCoordinator(this.metricsFactory);
        this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
        this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator, this.leaseRefresher);
        this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
        this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
        this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
        this.diagnosticEventFactory = diagnosticEventFactory;
        this.diagnosticEventHandler = new DiagnosticEventLogger();
        this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, this.isMultiStreamMode).createShardSyncTaskManager(this.metricsFactory, (StreamConfig)streamConfig);
        this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
        this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
        this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
        this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator() != null ? coordinatorConfig.gracefulShutdownCoordinator() : this.coordinatorConfig.coordinatorFactory().createGracefulShutdownCoordinator();
        this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener() != null ? coordinatorConfig.workerStateChangeListener() : this.coordinatorConfig.coordinatorFactory().createWorkerStateChangeListener();
        this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(this.leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1);
        this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
        this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
        this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
        this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
        this.shardDetectorProvider = streamConfig -> this.createOrGetShardSyncTaskManager((StreamConfig)streamConfig).shardDetector();
        this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
        this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
        this.hierarchicalShardSyncerProvider = streamConfig -> this.createOrGetShardSyncTaskManager((StreamConfig)streamConfig).hierarchicalShardSyncer();
        this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
        this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), this.leaderDecider, this.leaseRefresher, this.currentStreamConfigMap, this.shardSyncTaskManagerProvider, this.isMultiStreamMode, this.metricsFactory, leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
        this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, this.isMultiStreamMode).createLeaseCleanupManager(this.metricsFactory);
    }

    @Override
    public void run() {
        if (this.shutdown) {
            return;
        }
        try {
            this.initialize();
            log.info("Initialization complete. Starting worker loop.");
        }
        catch (RuntimeException e) {
            log.error("Unable to initialize after {} attempts. Shutting down.", (Object)this.maxInitializationAttempts, (Object)e);
            this.workerStateChangeListener.onAllInitializationAttemptsFailed(e);
            this.shutdown();
        }
        while (!this.shouldShutdown()) {
            this.runProcessLoop();
        }
        this.finalShutdown();
        log.info("Worker loop is complete. Exiting from worker.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void initialize() {
        Object object = this.lock;
        synchronized (object) {
            this.registerErrorHandlerForUndeliverableAsyncTaskExceptions();
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
            boolean isDone = false;
            Exception lastException = null;
            for (int i = 0; !isDone && i < this.maxInitializationAttempts; ++i) {
                try {
                    log.info("Initialization attempt {}", (Object)(i + 1));
                    log.info("Initializing LeaseCoordinator");
                    this.leaseCoordinator.initialize();
                    if (!this.skipShardSyncAtWorkerInitializationIfLeasesExist || this.leaseRefresher.isLeaseTableEmpty()) {
                        if (this.shouldInitiateLeaseSync()) {
                            log.info("Worker {} is initiating the lease sync.", (Object)this.leaseManagementConfig.workerIdentifier());
                            this.leaderElectedPeriodicShardSyncManager.syncShardsOnce();
                        }
                    } else {
                        log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
                    }
                    this.leaseCleanupManager.start();
                    if (!this.leaseCoordinator.isRunning()) {
                        log.info("Starting LeaseCoordinator");
                        this.leaseCoordinator.start();
                    } else {
                        log.info("LeaseCoordinator is already running. No need to start it.");
                    }
                    log.info("Scheduling periodicShardSync");
                    this.leaderElectedPeriodicShardSyncManager.start();
                    this.streamSyncWatch.start();
                    isDone = true;
                }
                catch (LeasingException e) {
                    log.error("Caught exception when initializing LeaseCoordinator", (Throwable)e);
                    lastException = e;
                }
                catch (Exception e) {
                    lastException = e;
                }
                if (isDone) continue;
                try {
                    Thread.sleep(this.schedulerInitializationBackoffTimeMillis);
                    this.leaderElectedPeriodicShardSyncManager.stop();
                    continue;
                }
                catch (InterruptedException e) {
                    log.debug("Sleep interrupted while initializing worker.");
                }
            }
            if (!isDone) {
                throw new RuntimeException(lastException);
            }
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
        }
    }

    @VisibleForTesting
    boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
        long waitTime = ThreadLocalRandom.current().nextLong(1000L, 30000L);
        long waitUntil = System.currentTimeMillis() + waitTime;
        boolean shouldInitiateLeaseSync = true;
        while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = this.leaseRefresher.isLeaseTableEmpty())) {
            log.info("Lease table is still empty. Checking again in {} ms", (Object)3000L);
            Thread.sleep(3000L);
        }
        return shouldInitiateLeaseSync;
    }

    @VisibleForTesting
    void runProcessLoop() {
        try {
            HashSet<ShardInfo> assignedShards = new HashSet<ShardInfo>();
            for (ShardInfo shardInfo : this.getShardInfoForAssignments()) {
                ShardConsumer shardConsumer = this.createOrGetShardConsumer(shardInfo, this.processorConfig.shardRecordProcessorFactory(), this.leaseCleanupManager);
                shardConsumer.executeLifecycle();
                assignedShards.add(shardInfo);
            }
            this.cleanupShardConsumers(assignedShards);
            if (this.isLeader()) {
                this.checkAndSyncStreamShardsAndLeases();
            }
            this.logExecutorState();
            this.slog.info("Sleeping ...");
            Thread.sleep(this.shardConsumerDispatchPollIntervalMillis);
        }
        catch (Exception e) {
            log.error("Worker.run caught exception, sleeping for {} milli seconds!", (Object)String.valueOf(this.shardConsumerDispatchPollIntervalMillis), (Object)e);
            try {
                Thread.sleep(this.shardConsumerDispatchPollIntervalMillis);
            }
            catch (InterruptedException ex) {
                log.info("Worker: sleep interrupted after catching exception ", (Throwable)ex);
            }
        }
        this.slog.resetInfoLogging();
    }

    private boolean isLeader() {
        return this.leaderDecider.isLeader(this.leaseManagementConfig.workerIdentifier());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        HashSet<StreamIdentifier> streamsSynced = new HashSet<StreamIdentifier>();
        if (this.shouldSyncStreamsNow()) {
            MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, MULTI_STREAM_TRACKER);
            try {
                HashMap<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<StreamIdentifier, StreamConfig>();
                Duration waitPeriodToDeleteOldStreams = this.formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
                newStreamConfigMap.putAll(this.multiStreamTracker.streamConfigList().stream().collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
                if (!this.leasesSyncedOnAppInit && this.isMultiStreamMode) {
                    List<MultiStreamLease> leases = this.fetchMultiStreamLeases();
                    this.syncStreamsFromLeaseTableOnAppInit(leases);
                    this.leasesSyncedOnAppInit = true;
                }
                for (StreamIdentifier streamIdentifier2 : newStreamConfigMap.keySet()) {
                    if (!this.currentStreamConfigMap.containsKey(streamIdentifier2)) {
                        log.info("Found new stream to process: " + streamIdentifier2 + ". Syncing shards of that stream.");
                        ShardSyncTaskManager shardSyncTaskManager = this.createOrGetShardSyncTaskManager((StreamConfig)newStreamConfigMap.get(streamIdentifier2));
                        shardSyncTaskManager.submitShardSyncTask();
                        this.currentStreamConfigMap.put(streamIdentifier2, (StreamConfig)newStreamConfigMap.get(streamIdentifier2));
                        streamsSynced.add(streamIdentifier2);
                        continue;
                    }
                    if (!log.isDebugEnabled()) continue;
                    log.debug(streamIdentifier2 + " is already being processed - skipping shard sync.");
                }
                Consumer<StreamIdentifier> enqueueStreamLeaseDeletionOperation = streamIdentifier -> {
                    if (!newStreamConfigMap.containsKey(streamIdentifier)) {
                        this.staleStreamDeletionMap.putIfAbsent((StreamIdentifier)streamIdentifier, Instant.now());
                    }
                };
                if (this.formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
                    this.currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept((StreamIdentifier)streamIdentifier));
                } else if (this.formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
                    Optional.ofNullable(this.formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept((StreamIdentifier)streamIdentifier)));
                } else {
                    Iterator<StreamIdentifier> currentSetOfStreamsIter = this.currentStreamConfigMap.keySet().iterator();
                    while (currentSetOfStreamsIter.hasNext()) {
                        StreamIdentifier streamIdentifier3 = currentSetOfStreamsIter.next();
                        if (newStreamConfigMap.containsKey(streamIdentifier3)) continue;
                        log.info("Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases, as part of this workflow", (Object)streamIdentifier3);
                        currentSetOfStreamsIter.remove();
                        streamsSynced.add(streamIdentifier3);
                    }
                }
                Map staleStreamIdDeletionDecisionMap = this.staleStreamDeletionMap.keySet().stream().collect(Collectors.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
                Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> Duration.between(this.staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
                Set<StreamIdentifier> deletedStreamsLeases = this.deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
                streamsSynced.addAll(deletedStreamsLeases);
                Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
                this.removeStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
                log.warn("Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ", this.staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((Instant)entry.getValue()).plus(waitPeriodToDeleteOldStreams))));
                this.streamSyncWatch.reset().start();
                MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, this.staleStreamDeletionMap.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
            }
            finally {
                MetricsUtil.endScope(metricsScope);
            }
        }
        return streamsSynced;
    }

    @VisibleForTesting
    boolean shouldSyncStreamsNow() {
        return this.isMultiStreamMode && this.streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > 60000L;
    }

    private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
        Set streamIdentifiers = leases.stream().map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())).collect(Collectors.toSet());
        for (StreamIdentifier streamIdentifier : streamIdentifiers) {
            if (this.currentStreamConfigMap.containsKey(streamIdentifier)) continue;
            this.currentStreamConfigMap.put(streamIdentifier, this.getDefaultStreamConfig(streamIdentifier));
        }
    }

    private List<MultiStreamLease> fetchMultiStreamLeases() throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        return this.leaseCoordinator.leaseRefresher().listLeases();
    }

    private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdentifiers) {
        for (StreamIdentifier streamIdentifier : streamIdentifiers) {
            this.staleStreamDeletionMap.remove(streamIdentifier);
        }
    }

    private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        HashSet<StreamIdentifier> streamsSynced = new HashSet<StreamIdentifier>();
        List<MultiStreamLease> leases = null;
        Map<String, List> streamIdToShardsMap = null;
        for (StreamIdentifier streamIdentifier : streamIdentifiers) {
            if (leases == null) {
                leases = this.fetchMultiStreamLeases();
            }
            if (streamIdToShardsMap == null) {
                streamIdToShardsMap = leases.stream().collect(Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new)));
            }
            log.warn("Found old/deleted stream: " + streamIdentifier + ". Directly deleting leases of this stream.");
            if (!this.deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) continue;
            this.currentStreamConfigMap.remove(streamIdentifier);
            this.staleStreamDeletionMap.remove(streamIdentifier);
            streamsSynced.add(streamIdentifier);
        }
        return streamsSynced;
    }

    private boolean deleteMultiStreamLeases(List<MultiStreamLease> leases) {
        if (leases != null) {
            for (MultiStreamLease lease : leases) {
                try {
                    this.leaseRefresher.deleteLease(lease);
                }
                catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
                    log.error("Unable to delete stale stream lease {}. Skipping further deletions for this stream. Will retry later.", (Object)lease.leaseKey(), (Object)e);
                    return false;
                }
            }
        }
        return true;
    }

    private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) {
        return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
    }

    @VisibleForTesting
    boolean shouldShutdown() {
        if (this.executorService.isShutdown()) {
            log.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
            return true;
        }
        if (this.shutdown) {
            if (this.shardInfoShardConsumerMap.isEmpty()) {
                log.info("All record processors have been shutdown successfully.");
                return true;
            }
            if (System.currentTimeMillis() - this.shutdownStartTimeMillis >= this.failoverTimeMillis) {
                log.info("Lease failover time is reached, so forcing shutdown.");
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Boolean> startGracefulShutdown() {
        Scheduler scheduler = this;
        synchronized (scheduler) {
            if (this.gracefulShutdownFuture == null) {
                this.gracefulShutdownFuture = this.gracefulShutdownCoordinator.startGracefulShutdown(this.createGracefulShutdownCallable());
            }
        }
        return this.gracefulShutdownFuture;
    }

    public Callable<Boolean> createGracefulShutdownCallable() {
        if (this.shutdownComplete()) {
            return () -> true;
        }
        Callable<GracefulShutdownContext> startShutdown = this.createWorkerShutdownCallable();
        return this.gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
    }

    public boolean hasGracefulShutdownStarted() {
        return this.gracefuleShutdownStarted;
    }

    @VisibleForTesting
    Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
        return () -> {
            Scheduler scheduler = this;
            synchronized (scheduler) {
                if (this.gracefuleShutdownStarted) {
                    throw new IllegalStateException("Requested shutdown has already been started");
                }
                this.gracefuleShutdownStarted = true;
            }
            this.leaseCoordinator.stopLeaseTaker();
            Collection<Lease> leases = this.leaseCoordinator.getAssignments();
            if (leases == null || leases.isEmpty()) {
                this.shutdown();
                return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED;
            }
            CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
            CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
            for (Lease lease : leases) {
                ShardConsumerShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(this.leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch);
                ShardInfo shardInfo = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease);
                ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shardInfo);
                if (consumer != null) {
                    consumer.gracefulShutdown(shutdownNotification);
                    continue;
                }
                notificationCompleteLatch.countDown();
                shutdownCompleteLatch.countDown();
            }
            return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                log.warn("Shutdown requested a second time.");
                return;
            }
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN_STARTED);
            log.info("Worker shutdown requested.");
            this.shutdown = true;
            this.shutdownStartTimeMillis = System.currentTimeMillis();
            this.leaseCoordinator.stop();
            this.leaderElectedPeriodicShardSyncManager.stop();
            this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
        }
    }

    private void finalShutdown() {
        log.info("Starting worker's final shutdown.");
        if (this.executorService instanceof SchedulerCoordinatorFactory.SchedulerThreadPoolExecutor) {
            this.executorService.shutdownNow();
        }
        if (this.metricsFactory instanceof CloudWatchMetricsFactory) {
            ((CloudWatchMetricsFactory)this.metricsFactory).shutdown();
        }
        this.shutdownComplete = true;
    }

    private List<ShardInfo> getShardInfoForAssignments() {
        List<ShardInfo> assignedStreamShards = this.leaseCoordinator.getCurrentAssignments();
        List<ShardInfo> prioritizedShards = this.shardPrioritization.prioritize(assignedStreamShards);
        if (prioritizedShards != null && !prioritizedShards.isEmpty()) {
            if (this.slog.isInfoEnabled()) {
                StringBuilder builder = new StringBuilder();
                boolean firstItem = true;
                for (ShardInfo shardInfo : prioritizedShards) {
                    if (!firstItem) {
                        builder.append(", ");
                    }
                    builder.append(ShardInfo.getLeaseKey(shardInfo));
                    firstItem = false;
                }
                this.slog.info("Current stream shard assignments: " + builder.toString());
            }
        } else {
            this.slog.info("No activities assigned");
        }
        return prioritizedShards;
    }

    ShardConsumer createOrGetShardConsumer(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardRecordProcessorFactory == null) {
            throw new NullPointerException("shardRecordProcessorFactory");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager");
        }
        ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shardInfo);
        if (consumer == null || consumer.isShutdown() && consumer.shutdownReason().equals((Object)ShutdownReason.LEASE_LOST)) {
            consumer = this.buildConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
            this.shardInfoShardConsumerMap.put(shardInfo, consumer);
            this.slog.infoForce("Created new shardConsumer for : " + shardInfo);
        }
        return consumer;
    }

    private ShardSyncTaskManager createOrGetShardSyncTaskManager(StreamConfig streamConfig) {
        return this.streamToShardSyncTaskManagerMap.computeIfAbsent(streamConfig, s -> this.shardSyncTaskManagerProvider.apply((StreamConfig)s));
    }

    protected ShardConsumer buildConsumer(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardRecordProcessorFactory == null) {
            throw new NullPointerException("shardRecordProcessorFactory");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager");
        }
        ShardRecordProcessorCheckpointer checkpointer = this.coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, this.checkpoint);
        StreamIdentifier streamIdentifier = this.getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
        StreamConfig streamConfig = this.currentStreamConfigMap.getOrDefault(streamIdentifier, this.getDefaultStreamConfig(streamIdentifier));
        Validate.notNull((Object)streamConfig, (String)"StreamConfig should not be null", (Object[])new Object[0]);
        RecordsPublisher cache = this.retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, this.metricsFactory);
        ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamConfig.streamIdentifier(), this.leaseCoordinator, this.executorService, cache, shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier), this.checkpoint, checkpointer, this.parentShardPollIntervalMillis, this.taskBackoffTimeMillis, this.skipShardSyncAtWorkerInitializationIfLeasesExist, this.listShardsBackoffTimeMillis, this.maxListShardsRetryAttempts, this.processorConfig.callProcessRecordsEvenForEmptyRecordList(), this.shardConsumerDispatchPollIntervalMillis, streamConfig.initialPositionInStreamExtended(), this.cleanupLeasesUponShardCompletion, this.ignoreUnexpetedChildShards, this.shardDetectorProvider.apply(streamConfig), this.aggregatorUtil, this.hierarchicalShardSyncerProvider.apply(streamConfig), this.metricsFactory, leaseCleanupManager);
        return new ShardConsumer(cache, this.executorService, shardInfo, this.lifecycleConfig.logWarningForTaskAfterMillis(), argument, this.lifecycleConfig.taskExecutionListener(), this.lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
    }

    void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
        for (ShardInfo shard : this.shardInfoShardConsumerMap.keySet()) {
            if (assignedShards.contains(shard)) continue;
            ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shard);
            if (consumer.leaseLost()) {
                this.shardInfoShardConsumerMap.remove(shard);
                log.debug("Removed consumer for {} as lease has been lost", (Object)ShardInfo.getLeaseKey(shard));
                continue;
            }
            consumer.executeLifecycle();
        }
    }

    private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
        RxJavaPlugins.setErrorHandler(t -> {
            ExecutorStateEvent executorStateEvent = this.diagnosticEventFactory.executorStateEvent(this.executorService, this.leaseCoordinator);
            RejectedTaskEvent rejectedTaskEvent = this.diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, (Throwable)t);
            rejectedTaskEvent.accept(this.diagnosticEventHandler);
        });
    }

    private void logExecutorState() {
        ExecutorStateEvent executorStateEvent = this.diagnosticEventFactory.executorStateEvent(this.executorService, this.leaseCoordinator);
        executorStateEvent.accept(this.diagnosticEventHandler);
    }

    private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
        StreamIdentifier streamIdentifier;
        if (streamIdentifierString.isPresent()) {
            streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
        } else {
            Validate.isTrue((!this.isMultiStreamMode ? 1 : 0) != 0, (String)"Should not be in MultiStream Mode", (Object[])new Object[0]);
            streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
        }
        Validate.notNull((Object)streamIdentifier, (String)"Stream identifier should not be empty", (Object[])new Object[0]);
        return streamIdentifier;
    }

    @Deprecated
    public Future<Void> requestShutdown() {
        return null;
    }

    public SchedulerLog slog() {
        return this.slog;
    }

    public CheckpointConfig checkpointConfig() {
        return this.checkpointConfig;
    }

    public CoordinatorConfig coordinatorConfig() {
        return this.coordinatorConfig;
    }

    public LeaseManagementConfig leaseManagementConfig() {
        return this.leaseManagementConfig;
    }

    public LifecycleConfig lifecycleConfig() {
        return this.lifecycleConfig;
    }

    public MetricsConfig metricsConfig() {
        return this.metricsConfig;
    }

    public ProcessorConfig processorConfig() {
        return this.processorConfig;
    }

    public RetrievalConfig retrievalConfig() {
        return this.retrievalConfig;
    }

    public String applicationName() {
        return this.applicationName;
    }

    public int maxInitializationAttempts() {
        return this.maxInitializationAttempts;
    }

    public Checkpointer checkpoint() {
        return this.checkpoint;
    }

    public long shardConsumerDispatchPollIntervalMillis() {
        return this.shardConsumerDispatchPollIntervalMillis;
    }

    public long parentShardPollIntervalMillis() {
        return this.parentShardPollIntervalMillis;
    }

    public ExecutorService executorService() {
        return this.executorService;
    }

    public DiagnosticEventFactory diagnosticEventFactory() {
        return this.diagnosticEventFactory;
    }

    public DiagnosticEventHandler diagnosticEventHandler() {
        return this.diagnosticEventHandler;
    }

    public LeaseCoordinator leaseCoordinator() {
        return this.leaseCoordinator;
    }

    public Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider() {
        return this.shardSyncTaskManagerProvider;
    }

    public Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap() {
        return this.streamToShardSyncTaskManagerMap;
    }

    public PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager() {
        return this.leaderElectedPeriodicShardSyncManager;
    }

    public ShardPrioritization shardPrioritization() {
        return this.shardPrioritization;
    }

    public boolean cleanupLeasesUponShardCompletion() {
        return this.cleanupLeasesUponShardCompletion;
    }

    public boolean skipShardSyncAtWorkerInitializationIfLeasesExist() {
        return this.skipShardSyncAtWorkerInitializationIfLeasesExist;
    }

    public GracefulShutdownCoordinator gracefulShutdownCoordinator() {
        return this.gracefulShutdownCoordinator;
    }

    public WorkerStateChangeListener workerStateChangeListener() {
        return this.workerStateChangeListener;
    }

    public MetricsFactory metricsFactory() {
        return this.metricsFactory;
    }

    public long failoverTimeMillis() {
        return this.failoverTimeMillis;
    }

    public long taskBackoffTimeMillis() {
        return this.taskBackoffTimeMillis;
    }

    public boolean isMultiStreamMode() {
        return this.isMultiStreamMode;
    }

    public Map<StreamIdentifier, StreamConfig> currentStreamConfigMap() {
        return this.currentStreamConfigMap;
    }

    public MultiStreamTracker multiStreamTracker() {
        return this.multiStreamTracker;
    }

    public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
        return this.formerStreamsLeasesDeletionStrategy;
    }

    public long listShardsBackoffTimeMillis() {
        return this.listShardsBackoffTimeMillis;
    }

    public int maxListShardsRetryAttempts() {
        return this.maxListShardsRetryAttempts;
    }

    public LeaseRefresher leaseRefresher() {
        return this.leaseRefresher;
    }

    public Function<StreamConfig, ShardDetector> shardDetectorProvider() {
        return this.shardDetectorProvider;
    }

    public boolean ignoreUnexpetedChildShards() {
        return this.ignoreUnexpetedChildShards;
    }

    public AggregatorUtil aggregatorUtil() {
        return this.aggregatorUtil;
    }

    public Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider() {
        return this.hierarchicalShardSyncerProvider;
    }

    public long schedulerInitializationBackoffTimeMillis() {
        return this.schedulerInitializationBackoffTimeMillis;
    }

    public LeaderDecider leaderDecider() {
        return this.leaderDecider;
    }

    public Map<StreamIdentifier, Instant> staleStreamDeletionMap() {
        return this.staleStreamDeletionMap;
    }

    public LeaseCleanupManager leaseCleanupManager() {
        return this.leaseCleanupManager;
    }

    public ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap() {
        return this.shardInfoShardConsumerMap;
    }

    public long shutdownStartTimeMillis() {
        return this.shutdownStartTimeMillis;
    }

    public boolean shutdownComplete() {
        return this.shutdownComplete;
    }

    public Object lock() {
        return this.lock;
    }

    public Stopwatch streamSyncWatch() {
        return this.streamSyncWatch;
    }

    public boolean leasesSyncedOnAppInit() {
        return this.leasesSyncedOnAppInit;
    }

    public Future<Boolean> gracefulShutdownFuture() {
        return this.gracefulShutdownFuture;
    }

    public boolean gracefuleShutdownStarted() {
        return this.gracefuleShutdownStarted;
    }

    private static class SchedulerLog {
        private long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1L);
        private long nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
        private boolean infoReporting;

        void info(Object message) {
            if (this.isInfoEnabled()) {
                log.info("{}", message);
            }
        }

        void infoForce(Object message) {
            log.info("{}", message);
        }

        private boolean isInfoEnabled() {
            return this.infoReporting;
        }

        private void resetInfoLogging() {
            if (this.infoReporting) {
                if (log.isInfoEnabled()) {
                    this.infoReporting = false;
                    this.nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
                }
            } else if (this.nextReportTime <= System.currentTimeMillis()) {
                this.infoReporting = true;
            }
        }

        private SchedulerLog() {
        }
    }
}

