/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.NoticesQueue;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.base.AbstractInstant;

public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
implements StreamSupervisor {
    public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
    public static final String AUTOSCALER_SKIP_REASON_DIMENSION = "scalingSkipReason";
    public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = "task/autoScaler/requiredCount";
    public static final String AUTOSCALER_SCALING_TIME_METRIC = "task/autoScaler/scaleActionTime";
    private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000L;
    private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000L;
    private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000L;
    private static final long MAX_RUN_FREQUENCY_MILLIS = 1000L;
    private static final int MAX_INITIALIZATION_RETRIES = 20;
    private static final int MIN_WORKER_CORE_THREADS = 2;
    private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
    private static final int WORKER_THREAD_KEEPALIVE_TIME_MILLIS = 2000;
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamSupervisor.class);
    private static final Comparator<ParseExceptionReport> PARSE_EXCEPTION_REPORT_COMPARATOR = Comparator.comparingLong(ParseExceptionReport::getTimeOfExceptionMillis).thenComparing(ParseExceptionReport::getErrorType, StringComparators.LEXICOGRAPHIC).thenComparing(ParseExceptionReport::getInput, StringComparators.LEXICOGRAPHIC);
    private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap();
    protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionOffsets = new ConcurrentHashMap();
    protected final ConcurrentHashMap<Integer, Set<PartitionIdType>> partitionGroups = new ConcurrentHashMap();
    protected final ObjectMapper sortingMapper;
    protected final List<PartitionIdType> partitionIds = new CopyOnWriteArrayList<PartitionIdType>();
    protected final SeekableStreamSupervisorStateManager stateManager;
    protected volatile DateTime sequenceLastUpdated;
    private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    protected final String dataSource;
    private final Set<PartitionIdType> subsequentlyDiscoveredPartitions = new HashSet<PartitionIdType>();
    private final TaskStorage taskStorage;
    private final TaskMaster taskMaster;
    private final SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> taskClient;
    private final SeekableStreamSupervisorSpec spec;
    private final SeekableStreamSupervisorIOConfig ioConfig;
    private final AutoScalerConfig autoScalerConfig;
    private final SeekableStreamSupervisorTuningConfig tuningConfig;
    private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
    private final String supervisorId;
    private final String supervisorTag;
    private final TaskInfoProvider taskInfoProvider;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final ExecutorService exec;
    private final ScheduledExecutorService scheduledExec;
    private final ScheduledExecutorService reportingExec;
    private final ListeningScheduledExecutorService workerExec;
    private final NoticesQueue<Notice> notices = new NoticesQueue();
    private final Object stopLock = new Object();
    private final Object stateChangeLock = new Object();
    private final ReentrantLock recordSupplierLock = new ReentrantLock();
    private List<ParseExceptionReport> lastKnownParseErrors = new ArrayList<ParseExceptionReport>();
    private final boolean useExclusiveStartingSequence;
    private boolean listenerRegistered = false;
    private long lastRunTime;
    private long dynamicTriggerLastRunTime;
    private int initRetryCounter = 0;
    private volatile DateTime firstRunTime;
    private volatile DateTime earlyStopTime = null;
    protected volatile RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
    private volatile boolean started = false;
    private volatile boolean stopped = false;
    private volatile boolean lifecycleStarted = false;
    private final ServiceEmitter emitter;
    private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<PartitionIdType, SequenceOffsetType>();
    private long lastActiveTimeMillis;
    private final IdleConfig idleConfig;

    private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException {
        Collection<TaskGroup> activeTaskGroups = this.activelyReadingTaskGroups.values();
        int currentActiveTaskCount = activeTaskGroups.size();
        if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {
            return false;
        }
        log.info("Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", new Object[]{currentActiveTaskCount, desiredActiveTaskCount, this.supervisorId, this.dataSource});
        Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
        this.gracefulShutdownInternal();
        this.changeTaskCountInIOConfig(desiredActiveTaskCount);
        this.clearAllocationInfo();
        this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimension("stream", (Object)this.getIoConfig().getStream()).setDimensionIfNotNull("tags", this.spec.getContextValue("tags")).setMetric(AUTOSCALER_SCALING_TIME_METRIC, (Number)scaleActionStopwatch.millisElapsed()));
        log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", new Object[]{desiredActiveTaskCount, this.supervisorId, this.dataSource});
        return true;
    }

    private void changeTaskCountInIOConfig(int desiredActiveTaskCount) {
        this.ioConfig.setTaskCount(desiredActiveTaskCount);
        try {
            Optional<SupervisorManager> supervisorManager = this.taskMaster.getSupervisorManager();
            if (supervisorManager.isPresent()) {
                MetadataSupervisorManager metadataSupervisorManager = ((SupervisorManager)supervisorManager.get()).getMetadataSupervisorManager();
                metadataSupervisorManager.insert(this.supervisorId, (SupervisorSpec)this.spec);
            } else {
                log.error("supervisorManager is null in taskMaster, skipping scale action for supervisor[%s] for dataSource[%s].", new Object[]{this.supervisorId, this.dataSource});
            }
        }
        catch (Exception e) {
            log.error((Throwable)e, "Failed to sync taskCount to MetaStorage for supervisor[%s] for dataSource[%s].", new Object[]{this.supervisorId, this.dataSource});
        }
    }

    private void clearAllocationInfo() {
        this.activelyReadingTaskGroups.clear();
        this.partitionGroups.clear();
        this.partitionOffsets.clear();
        this.pendingCompletionTaskGroups.clear();
        this.partitionIds.clear();
    }

    public SeekableStreamSupervisor(String supervisorTag, final TaskStorage taskStorage, final TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, SeekableStreamIndexTaskClientFactory<PartitionIdType, SequenceOffsetType> taskClientFactory, ObjectMapper mapper, SeekableStreamSupervisorSpec spec, RowIngestionMetersFactory rowIngestionMetersFactory, boolean useExclusiveStartingSequence) {
        IdleConfig specIdleConfig;
        this.supervisorTag = (String)Preconditions.checkNotNull((Object)supervisorTag, (Object)"supervisorTag");
        this.taskStorage = taskStorage;
        this.taskMaster = taskMaster;
        this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
        this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
        this.spec = spec;
        this.emitter = spec.getEmitter();
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.useExclusiveStartingSequence = useExclusiveStartingSequence;
        this.dataSource = spec.getDataSchema().getDataSource();
        this.ioConfig = spec.getIoConfig();
        this.autoScalerConfig = this.ioConfig.getAutoScalerConfig();
        this.tuningConfig = spec.getTuningConfig();
        this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
        this.supervisorId = spec.getId();
        this.exec = Execs.singleThreaded((String)StringUtils.encodeForFormat((String)supervisorTag));
        this.scheduledExec = Execs.scheduledSingleThreaded((String)(StringUtils.encodeForFormat((String)supervisorTag) + "-Scheduler-%d"));
        this.reportingExec = Execs.scheduledSingleThreaded((String)(StringUtils.encodeForFormat((String)supervisorTag) + "-Reporting-%d"));
        this.stateManager = new SeekableStreamSupervisorStateManager(spec.getSupervisorStateManagerConfig(), spec.isSuspended());
        if (this.autoScalerConfig != null && this.autoScalerConfig.getEnableTaskAutoScaler()) {
            log.info("Running Task autoscaler for supervisor[%s] for datasource[%s]", new Object[]{this.supervisorId, this.dataSource});
        }
        this.idleConfig = (specIdleConfig = spec.getIoConfig().getIdleConfig()) != null ? (specIdleConfig.getInactiveAfterMillis() != null ? specIdleConfig : new IdleConfig(specIdleConfig.isEnabled(), spec.getSupervisorStateManagerConfig().getInactiveAfterMillis())) : new IdleConfig(spec.getSupervisorStateManagerConfig().isIdleConfigEnabled(), spec.getSupervisorStateManagerConfig().getInactiveAfterMillis());
        int workerThreads = SeekableStreamSupervisor.calculateWorkerThreads(this.tuningConfig, this.ioConfig);
        ScheduledThreadPoolExecutor executor = ScheduledExecutors.fixedWithKeepAliveTime((int)workerThreads, (String)(StringUtils.encodeForFormat((String)supervisorTag) + "-Worker-%d"), (long)2000L);
        this.workerExec = MoreExecutors.listeningDecorator((ScheduledExecutorService)executor);
        log.info("Created worker pool with [%d] threads for supervisor[%s], dataSource[%s]", new Object[]{workerThreads, this.supervisorId, this.dataSource});
        this.taskInfoProvider = new TaskInfoProvider(){

            @Override
            public TaskLocation getTaskLocation(String id) {
                Preconditions.checkNotNull((Object)id, (Object)"id");
                Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    return ((TaskRunner)taskRunner.get()).getTaskLocation(id);
                }
                log.error("Failed to get task runner because I'm not the leader!", new Object[0]);
                return TaskLocation.unknown();
            }

            @Override
            public Optional<TaskStatus> getTaskStatus(String id) {
                Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
                if (taskQueue.isPresent()) {
                    return ((TaskQueue)taskQueue.get()).getTaskStatus(id);
                }
                return taskStorage.getStatus(id);
            }
        };
        this.taskClient = taskClientFactory.build(this.dataSource, this.taskInfoProvider, this.tuningConfig, (ScheduledExecutorService)this.workerExec);
    }

    public static int calculateWorkerThreads(SeekableStreamSupervisorTuningConfig tuningConfig, SeekableStreamSupervisorIOConfig ioConfig) {
        if (tuningConfig.getWorkerThreads() != null) {
            return tuningConfig.getWorkerThreads();
        }
        AutoScalerConfig autoScalerConfig = ioConfig.getAutoScalerConfig();
        if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
            return Math.max(2, autoScalerConfig.getTaskCountMax() / 4);
        }
        return Math.max(2, ioConfig.getTaskCount() / 4);
    }

    public int getActiveTaskGroupsCount() {
        return this.activelyReadingTaskGroups.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.stateChangeLock;
        synchronized (object) {
            block5: {
                Preconditions.checkState((!this.lifecycleStarted ? 1 : 0) != 0, (Object)"already started");
                Preconditions.checkState((!this.exec.isShutdown() ? 1 : 0) != 0, (Object)"already stopped");
                try {
                    this.tryInit();
                }
                catch (Exception e) {
                    if (this.started) break block5;
                    log.warn("First initialization attempt failed for supervisor[%s], dataSource[%s], starting retries...", new Object[]{this.supervisorId, this.dataSource});
                    this.exec.submit(() -> {
                        try {
                            RetryUtils.retry(() -> {
                                this.tryInit();
                                return 0;
                            }, throwable -> !this.started, (int)0, (int)20, null, (String)StringUtils.format((String)"Failed to initialize supervisor[%s]", (Object[])new Object[]{this.supervisorId}));
                        }
                        catch (Exception e2) {
                            log.makeAlert("Failed to initialize after %s retries, aborting. Please resubmit the supervisor spec to restart this supervisor [%s]", new Object[]{20, this.supervisorId}).emit();
                            throw new RuntimeException(e2);
                        }
                    });
                }
            }
            this.lifecycleStarted = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(boolean stopGracefully) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.lifecycleStarted, (Object)"lifecycle not started");
            log.info("Beginning shutdown of [%s]", new Object[]{this.supervisorId});
            this.stateManager.maybeSetState((SupervisorStateManager.State)SupervisorStateManager.BasicState.STOPPING);
            try {
                this.scheduledExec.shutdownNow();
                this.reportingExec.shutdownNow();
                if (this.started) {
                    Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
                    if (taskRunner.isPresent()) {
                        ((TaskRunner)taskRunner.get()).unregisterListener(this.supervisorTag);
                    }
                    Object object2 = this.stopLock;
                    synchronized (object2) {
                        if (stopGracefully) {
                            log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish", new Object[0]);
                            this.addNotice(new GracefulShutdownNotice());
                        } else {
                            log.info("Posting ShutdownNotice", new Object[0]);
                            this.addNotice(new ShutdownNotice());
                        }
                        long shutdownTimeoutMillis = this.tuningConfig.getShutdownTimeout().getMillis();
                        long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
                        while (!this.stopped) {
                            long sleepTime = endTime - System.currentTimeMillis();
                            if (sleepTime <= 0L) {
                                log.info("Timed out while waiting for shutdown (timeout [%,dms])", new Object[]{shutdownTimeoutMillis});
                                this.stopped = true;
                                break;
                            }
                            this.stopLock.wait(sleepTime);
                        }
                    }
                    log.info("Shutdown notice handled", new Object[0]);
                }
                this.taskClient.close();
                this.workerExec.shutdownNow();
                this.exec.shutdownNow();
                this.started = false;
                log.info("[%s] has stopped", new Object[]{this.supervisorId});
            }
            catch (Exception e) {
                this.stateManager.recordThrowableEvent(e);
                log.makeAlert((Throwable)e, "Exception stopping [%s]", new Object[]{this.supervisorId}).emit();
            }
        }
    }

    public ListenableFuture<Void> stopAsync() {
        ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)("supervisor-shutdown-" + StringUtils.encodeForFormat((String)this.supervisorTag) + "--%d")));
        return shutdownExec.submit(() -> {
            this.stop(false);
            shutdownExec.shutdown();
            return null;
        });
    }

    public void reset(@Nullable DataSourceMetadata dataSourceMetadata) {
        log.info("Posting ResetNotice with datasource metadata [%s]", new Object[]{dataSourceMetadata});
        this.addNotice(new ResetNotice(dataSourceMetadata));
    }

    public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) {
        if (resetDataSourceMetadata == null) {
            throw InvalidInput.exception((String)"Reset dataSourceMetadata is required for resetOffsets.", (Object[])new Object[0]);
        }
        if (!this.checkSourceMetadataMatch(resetDataSourceMetadata)) {
            throw InvalidInput.exception((String)"Datasource metadata instance does not match required, found instance of [%s].", (Object[])new Object[]{resetDataSourceMetadata.getClass()});
        }
        SeekableStreamDataSourceMetadata resetMetadata = (SeekableStreamDataSourceMetadata)resetDataSourceMetadata;
        SeekableStreamSequenceNumbers streamSequenceNumbers = resetMetadata.getSeekableStreamSequenceNumbers();
        if (!(streamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers)) {
            throw InvalidInput.exception((String)"Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[%s], but found[%s].", (Object[])new Object[]{resetMetadata, SeekableStreamEndSequenceNumbers.class.getSimpleName(), streamSequenceNumbers.getClass().getSimpleName()});
        }
        String resetStream = streamSequenceNumbers.getStream();
        if (!this.ioConfig.getStream().equals(resetStream)) {
            throw InvalidInput.exception((String)"Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is consuming stream[%s].", (Object[])new Object[]{resetStream, this.supervisorId, this.ioConfig.getStream()});
        }
        log.info("Posting ResetOffsetsNotice with reset dataSource metadata[%s]", new Object[]{resetDataSourceMetadata});
        this.addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
    }

    public void registerNewVersionOfPendingSegment(PendingSegmentRecord pendingSegmentRecord) {
        for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
            if (!taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) continue;
            for (String taskId : taskGroup.taskIds()) {
                this.taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
            }
        }
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup taskGroup : list) {
                if (!taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) continue;
                for (String taskId : taskGroup.taskIds()) {
                    this.taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
                }
            }
        }
    }

    public ReentrantLock getRecordSupplierLock() {
        return this.recordSupplierLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void tryInit() {
        Object object = this.stateChangeLock;
        synchronized (object) {
            if (this.started) {
                log.warn("Supervisor[%s] was already started, skipping init", new Object[]{this.supervisorId});
                return;
            }
            if (this.stopped) {
                log.warn("Supervisor[%s] was already stopped, skipping init.", new Object[]{this.supervisorId});
                return;
            }
            try {
                this.recordSupplier = this.setupRecordSupplier();
                this.exec.submit(() -> {
                    try {
                        long pollTimeout = Math.max(this.ioConfig.getPeriod().getMillis(), 1000L);
                        while (!Thread.currentThread().isInterrupted() && !this.stopped) {
                            Notice notice = this.notices.poll(pollTimeout);
                            if (notice == null) continue;
                            try {
                                Stopwatch noticeHandleTime = Stopwatch.createStarted();
                                notice.handle();
                                String noticeType = notice.getType();
                                this.emitNoticeProcessTime(noticeType, noticeHandleTime.millisElapsed());
                                if (!log.isDebugEnabled()) continue;
                                log.debug("Handled notice[%s] from notices queue in [%d] ms, current notices queue size [%d] for supervisor[%s] for datasource[%s].", new Object[]{noticeType, noticeHandleTime.millisElapsed(), this.getNoticesQueueSize(), this.supervisorId, this.dataSource});
                            }
                            catch (Throwable e) {
                                this.stateManager.recordThrowableEvent(e);
                                log.makeAlert(e, "Supervisor[%s] for datasource[%s] failed to handle notice", new Object[]{this.supervisorId, this.dataSource}).addData("noticeClass", (Object)notice.getClass().getSimpleName()).emit();
                            }
                        }
                    }
                    catch (InterruptedException e) {
                        this.stateManager.recordThrowableEvent(e);
                        log.info("SeekableStreamSupervisor[%s] interrupted, exiting", new Object[]{this.supervisorId});
                    }
                });
                this.firstRunTime = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getStartDelay());
                this.scheduledExec.scheduleAtFixedRate(this.buildRunTask(), this.ioConfig.getStartDelay().getMillis(), Math.max(this.ioConfig.getPeriod().getMillis(), 1000L), TimeUnit.MILLISECONDS);
                this.scheduleReporting(this.reportingExec);
                this.started = true;
                log.info("Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]", new Object[]{this.supervisorId, this.ioConfig.getStartDelay(), this.spec.toString()});
            }
            catch (Exception e) {
                this.stateManager.recordThrowableEvent(e);
                if (this.recordSupplier != null) {
                    this.recordSupplier.close();
                }
                ++this.initRetryCounter;
                log.makeAlert((Throwable)e, "Exception starting SeekableStreamSupervisor[%s]", new Object[]{this.supervisorId}).emit();
                throw new RuntimeException(e);
            }
        }
    }

    public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, Runnable onSuccessfulScale, ServiceEmitter emitter) {
        return () -> this.addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter));
    }

    private Runnable buildRunTask() {
        return () -> this.addNotice(new RunNotice());
    }

    public SupervisorReport getStatus() {
        return this.generateReport(true);
    }

    public SupervisorStateManager.State getState() {
        return this.stateManager.getSupervisorState();
    }

    public Boolean isHealthy() {
        return this.stateManager.isHealthy();
    }

    private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(boolean includeOffsets) {
        int numPartitions = this.partitionGroups.values().stream().mapToInt(Set::size).sum();
        SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType> payload = this.createReportPayload(numPartitions, includeOffsets);
        SupervisorReport report = new SupervisorReport(this.supervisorId, DateTimes.nowUtc(), payload);
        ArrayList taskReports = new ArrayList();
        try {
            for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
                for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                    String taskId = entry.getKey();
                    DateTime startTime = entry.getValue().startTime;
                    Map currentOffsets = entry.getValue().currentSequences;
                    Long remainingSeconds = null;
                    if (startTime != null) {
                        long elapsedMillis = System.currentTimeMillis() - startTime.getMillis();
                        long remainingMillis = Math.max(0L, this.ioConfig.getTaskDuration().getMillis() - elapsedMillis);
                        remainingSeconds = TimeUnit.MILLISECONDS.toSeconds(remainingMillis);
                    }
                    taskReports.add(new TaskReportData(taskId, includeOffsets ? taskGroup.startingSequences : null, includeOffsets ? currentOffsets : null, startTime, remainingSeconds, TaskReportData.TaskType.ACTIVE, includeOffsets ? this.getRecordLagPerPartition(currentOffsets) : null, includeOffsets ? this.getTimeLagPerPartition(currentOffsets) : null));
                }
            }
            for (List list : this.pendingCompletionTaskGroups.values()) {
                for (TaskGroup taskGroup : list) {
                    for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                        String taskId = entry.getKey();
                        DateTime startTime = entry.getValue().startTime;
                        Map currentOffsets = entry.getValue().currentSequences;
                        Long remainingSeconds = null;
                        if (taskGroup.completionTimeout != null) {
                            remainingSeconds = Math.max(0L, taskGroup.completionTimeout.getMillis() - System.currentTimeMillis()) / 1000L;
                        }
                        taskReports.add(new TaskReportData(taskId, includeOffsets ? taskGroup.startingSequences : null, includeOffsets ? currentOffsets : null, startTime, remainingSeconds, TaskReportData.TaskType.PUBLISHING, null, null));
                    }
                }
            }
            taskReports.forEach(payload::addTask);
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to generate status report", new Object[0]);
        }
        return report;
    }

    public Map<String, Map<String, Object>> getStats() {
        try {
            return this.getCurrentTotalStats();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.warn((Throwable)ie, "getStats() interrupted.", new Object[0]);
            throw new RuntimeException(ie);
        }
        catch (ExecutionException ee) {
            throw new RuntimeException(ee);
        }
    }

    public List<ParseExceptionReport> getParseErrors() {
        try {
            if (this.spec.getSpec().getTuningConfig().convertToTaskTuningConfig().getMaxParseExceptions() <= 0) {
                return ImmutableList.of();
            }
            this.lastKnownParseErrors = this.getCurrentParseErrors();
            return this.lastKnownParseErrors;
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.warn((Throwable)ie, "getCurrentParseErrors() interrupted.", new Object[0]);
            throw new RuntimeException(ie);
        }
        catch (ExecutionException ee) {
            throw new RuntimeException(ee);
        }
    }

    private Map<String, Map<String, Object>> getCurrentTotalStats() throws InterruptedException, ExecutionException {
        int groupId;
        HashMap<String, Map<String, Object>> allStats = new HashMap<String, Map<String, Object>>();
        ArrayList futures = new ArrayList();
        ArrayList<Pair> groupAndTaskIds = new ArrayList<Pair>();
        Iterator iterator = ((ConcurrentHashMap.KeySetView)this.activelyReadingTaskGroups.keySet()).iterator();
        while (iterator.hasNext()) {
            groupId = (Integer)iterator.next();
            TaskGroup group = this.activelyReadingTaskGroups.get(groupId);
            for (String taskId : group.taskIds()) {
                futures.add(Futures.transform(this.taskClient.getMovingAveragesAsync(taskId), currentStats -> new StatsFromTaskResult(groupId, taskId, (Map<String, Object>)currentStats), (Executor)MoreExecutors.directExecutor()));
                groupAndTaskIds.add(new Pair((Object)groupId, (Object)taskId));
            }
        }
        iterator = ((ConcurrentHashMap.KeySetView)this.pendingCompletionTaskGroups.keySet()).iterator();
        while (iterator.hasNext()) {
            groupId = (Integer)iterator.next();
            List pendingGroups = this.pendingCompletionTaskGroups.get(groupId);
            for (TaskGroup pendingGroup : pendingGroups) {
                for (String taskId : pendingGroup.taskIds()) {
                    futures.add(Futures.transform(this.taskClient.getMovingAveragesAsync(taskId), currentStats -> new StatsFromTaskResult(groupId, taskId, (Map<String, Object>)currentStats), (Executor)MoreExecutors.directExecutor()));
                    groupAndTaskIds.add(new Pair((Object)groupId, (Object)taskId));
                }
            }
        }
        List results = this.coalesceAndAwait(futures);
        for (int i = 0; i < results.size(); ++i) {
            if (results.get(i).isValue()) {
                StatsFromTaskResult result = (StatsFromTaskResult)results.get(i).valueOrThrow();
                if (result == null) continue;
                Map groupMap = allStats.computeIfAbsent(result.getGroupId(), k -> new HashMap());
                groupMap.put(result.getTaskId(), result.getStats());
                continue;
            }
            Pair groupAndTaskId = (Pair)groupAndTaskIds.get(i);
            log.noStackTrace().warn((Throwable)results.get(i).error(), "Failed to get stats for group[%d]-task[%s]", new Object[]{groupAndTaskId.lhs, groupAndTaskId.rhs});
        }
        return allStats;
    }

    private List<ParseExceptionReport> getCurrentParseErrors() throws InterruptedException, ExecutionException {
        int groupId;
        ArrayList futures = new ArrayList();
        ArrayList<Pair> groupAndTaskIds = new ArrayList<Pair>();
        Iterator iterator = ((ConcurrentHashMap.KeySetView)this.activelyReadingTaskGroups.keySet()).iterator();
        while (iterator.hasNext()) {
            groupId = (Integer)iterator.next();
            TaskGroup group = this.activelyReadingTaskGroups.get(groupId);
            for (String taskId : group.taskIds()) {
                futures.add(Futures.transform(this.taskClient.getParseErrorsAsync(taskId), taskErrors -> new ErrorsFromTaskResult(groupId, taskId, (List<ParseExceptionReport>)taskErrors), (Executor)MoreExecutors.directExecutor()));
                groupAndTaskIds.add(new Pair((Object)groupId, (Object)taskId));
            }
        }
        iterator = ((ConcurrentHashMap.KeySetView)this.pendingCompletionTaskGroups.keySet()).iterator();
        while (iterator.hasNext()) {
            groupId = (Integer)iterator.next();
            List pendingGroups = this.pendingCompletionTaskGroups.get(groupId);
            for (TaskGroup pendingGroup : pendingGroups) {
                for (String taskId : pendingGroup.taskIds()) {
                    futures.add(Futures.transform(this.taskClient.getParseErrorsAsync(taskId), taskErrors -> new ErrorsFromTaskResult(groupId, taskId, (List<ParseExceptionReport>)taskErrors), (Executor)MoreExecutors.directExecutor()));
                    groupAndTaskIds.add(new Pair((Object)groupId, (Object)taskId));
                }
            }
        }
        TreeSet<ParseExceptionReport> parseErrorsTreeSet = new TreeSet<ParseExceptionReport>(PARSE_EXCEPTION_REPORT_COMPARATOR);
        parseErrorsTreeSet.addAll(this.lastKnownParseErrors);
        List results = this.coalesceAndAwait(futures);
        for (int i = 0; i < results.size(); ++i) {
            if (results.get(i).isValue()) {
                ErrorsFromTaskResult result = (ErrorsFromTaskResult)results.get(i).valueOrThrow();
                if (result == null) continue;
                parseErrorsTreeSet.addAll(result.getErrors());
                continue;
            }
            Pair groupAndTaskId = (Pair)groupAndTaskIds.get(i);
            log.noStackTrace().warn((Throwable)results.get(i).error(), "Failed to get errors for group[%d]-task[%s]", new Object[]{groupAndTaskId.lhs, groupAndTaskId.rhs});
        }
        SeekableStreamIndexTaskTuningConfig ss = this.spec.getSpec().getTuningConfig().convertToTaskTuningConfig();
        SeekableStreamSupervisorIOConfig oo = this.spec.getSpec().getIOConfig();
        int parseErrorLimit = this.spec.getSpec().getTuningConfig().convertToTaskTuningConfig().getMaxSavedParseExceptions() * this.spec.getSpec().getIOConfig().getTaskCount();
        parseErrorLimit = Math.min(parseErrorLimit, parseErrorsTreeSet.size());
        ArrayList<ParseExceptionReport> limitedParseErrors = new ArrayList<ParseExceptionReport>();
        Iterator<ParseExceptionReport> descendingIterator = parseErrorsTreeSet.descendingIterator();
        for (int i = 0; i < parseErrorLimit; ++i) {
            limitedParseErrors.add(descendingIterator.next());
        }
        return limitedParseErrors;
    }

    @VisibleForTesting
    public TaskGroup addTaskGroupToActivelyReadingTaskGroup(int taskGroupId, ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets, @Nullable DateTime minMsgTime, @Nullable DateTime maxMsgTime, Set<String> tasks, Set<PartitionIdType> exclusiveStartingSequencePartitions) {
        TaskGroup group = new TaskGroup(taskGroupId, partitionOffsets, null, minMsgTime, maxMsgTime, exclusiveStartingSequencePartitions);
        group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
        if (this.activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
            throw new ISE("trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.", new Object[]{taskGroupId});
        }
        return group;
    }

    @VisibleForTesting
    public TaskGroup addTaskGroupToPendingCompletionTaskGroup(int taskGroupId, ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets, @Nullable DateTime minMsgTime, @Nullable DateTime maxMsgTime, Set<String> tasks, Set<PartitionIdType> exclusiveStartingSequencePartitions) {
        TaskGroup group = new TaskGroup(taskGroupId, partitionOffsets, null, minMsgTime, maxMsgTime, exclusiveStartingSequencePartitions);
        group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
        this.pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList()).add(group);
        return group;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void runInternal() {
        try {
            this.possiblyRegisterListener();
            this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
            if (!this.updatePartitionDataFromStream() && !this.stateManager.isAtLeastOneSuccessfulRun()) {
                return;
            }
            this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
            this.discoverTasks();
            this.updateTaskStatus();
            this.checkTaskDuration();
            this.checkPendingCompletionTasks();
            this.checkCurrentTaskState();
            this.checkIfStreamInactiveAndTurnSupervisorIdle();
            if (this.stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) {
                this.logDebugReport();
                return;
            }
            Object object = this.stateChangeLock;
            synchronized (object) {
                if (this.stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) {
                    log.debug("Supervisor[%s] for datasource[%s] is already stopping.", new Object[]{this.supervisorId, this.dataSource});
                } else if (this.stateManager.isIdle()) {
                    log.debug("Supervisor[%s] for datasource[%s] is idle.", new Object[]{this.supervisorId, this.dataSource});
                } else if (!this.spec.isSuspended()) {
                    log.debug("Supervisor[%s] for datasource[%s] is running.", new Object[]{this.supervisorId, this.dataSource});
                    this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
                    this.createNewTasks();
                } else {
                    log.debug("Supervisor[%s] for datasource[%s] is suspended.", new Object[]{this.supervisorId, this.dataSource});
                    this.gracefulShutdownInternal();
                }
            }
            this.logDebugReport();
        }
        catch (Exception e) {
            this.stateManager.recordThrowableEvent(e);
            log.makeAlert((Throwable)e, "Exception in supervisor run loop for supervisor[%s] for dataSource[%s]", new Object[]{this.supervisorId, this.dataSource}).emit();
        }
        finally {
            this.stateManager.markRunFinished();
        }
    }

    private void logDebugReport() {
        if (log.isDebugEnabled()) {
            log.debug("%s", new Object[]{this.generateReport(true)});
        }
    }

    private void possiblyRegisterListener() {
        if (this.listenerRegistered) {
            return;
        }
        Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner)taskRunner.get()).registerListener(new TaskRunnerListener(){

                @Override
                public String getListenerId() {
                    return SeekableStreamSupervisor.this.supervisorTag;
                }

                @Override
                public void locationChanged(String taskId, TaskLocation newLocation) {
                }

                @Override
                public void statusChanged(String taskId, TaskStatus status) {
                    SeekableStreamSupervisor.this.addNotice(new RunNotice());
                }
            }, (Executor)Execs.directExecutor());
            this.listenerRegistered = true;
        }
    }

    @VisibleForTesting
    public void gracefulShutdownInternal() throws ExecutionException, InterruptedException {
        for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                if (this.taskInfoProvider.getTaskLocation(entry.getKey()).equals((Object)TaskLocation.unknown())) {
                    this.killTask(entry.getKey(), "Killing task for graceful shutdown", new Object[0]);
                    continue;
                }
                entry.getValue().startTime = DateTimes.EPOCH;
            }
        }
        this.earlyStopTime = DateTimes.EPOCH;
        this.checkTaskDuration();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @VisibleForTesting
    public void resetInternal(DataSourceMetadata dataSourceMetadata) {
        if (dataSourceMetadata == null) {
            boolean result = this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(this.supervisorId);
            log.info("Reset supervisor[%s] for dataSource[%s] - dataSource metadata entry deleted? [%s]", new Object[]{this.supervisorId, this.dataSource, result});
            this.activelyReadingTaskGroups.values().forEach(group -> this.killTasksInGroup((TaskGroup)group, "DataSourceMetadata is not found while reset", new Object[0]));
            this.activelyReadingTaskGroups.clear();
            this.partitionGroups.clear();
            this.partitionOffsets.clear();
            return;
        }
        if (!this.checkSourceMetadataMatch(dataSourceMetadata)) {
            throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{dataSourceMetadata.getClass()});
        }
        log.info("Reset supervisor[%s] for dataSource[%s] with metadata[%s]", new Object[]{this.supervisorId, this.dataSource, dataSourceMetadata});
        SeekableStreamDataSourceMetadata resetMetadata = (SeekableStreamDataSourceMetadata)dataSourceMetadata;
        if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(this.ioConfig.getStream())) {
            boolean metadataUpdateSuccess;
            DataSourceMetadata metadata = this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(this.supervisorId);
            if (metadata != null && !this.checkSourceMetadataMatch(metadata)) {
                throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{metadata.getClass()});
            }
            SeekableStreamDataSourceMetadata currentMetadata = (SeekableStreamDataSourceMetadata)metadata;
            boolean doReset = false;
            for (Map.Entry resetPartitionOffset : resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet()) {
                boolean isSameOffset;
                Object partitionOffsetInMetadataStore = currentMetadata == null ? null : currentMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().get(resetPartitionOffset.getKey());
                TaskGroup partitionTaskGroup = this.activelyReadingTaskGroups.get(this.getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
                boolean bl = isSameOffset = partitionTaskGroup != null && partitionTaskGroup.startingSequences.containsKey(resetPartitionOffset.getKey()) && partitionTaskGroup.startingSequences.get(resetPartitionOffset.getKey()).equals(resetPartitionOffset.getValue());
                if (partitionOffsetInMetadataStore == null && !isSameOffset) continue;
                doReset = true;
                break;
            }
            if (!doReset) {
                log.info("Ignoring duplicate reset request [%s]", new Object[]{dataSourceMetadata});
                return;
            }
            if (currentMetadata == null) {
                metadataUpdateSuccess = true;
            } else {
                DataSourceMetadata newMetadata = currentMetadata.minus(resetMetadata);
                try {
                    metadataUpdateSuccess = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.supervisorId, newMetadata);
                }
                catch (IOException e) {
                    log.error("Resetting DataSourceMetadata failed [%s]", new Object[]{e.getMessage()});
                    throw new RuntimeException(e);
                }
            }
            if (!metadataUpdateSuccess) throw new ISE("Unable to reset metadata", new Object[0]);
            resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> {
                int groupId = this.getTaskGroupIdForPartition(partition);
                this.killTaskGroupForPartitions((Set<PartitionIdType>)ImmutableSet.of((Object)partition), "DataSourceMetadata is updated while reset", new Object[0]);
                this.activelyReadingTaskGroups.remove(groupId);
                this.partitionGroups.computeIfAbsent(groupId, k -> new HashSet());
                this.partitionOffsets.put(partition, this.getNotSetMarker());
            });
            return;
        }
        log.warn("Reset metadata stream [%s] and supervisor's stream name [%s] do not match", new Object[]{resetMetadata.getSeekableStreamSequenceNumbers().getStream(), this.ioConfig.getStream()});
    }

    public void resetOffsetsInternal(@Nonnull DataSourceMetadata dataSourceMetadata) {
        boolean metadataUpdateSuccess;
        log.info("Reset offsets for supervisor[%s] for dataSource[%s] with metadata[%s]", new Object[]{this.supervisorId, this.dataSource, dataSourceMetadata});
        SeekableStreamDataSourceMetadata resetMetadata = (SeekableStreamDataSourceMetadata)dataSourceMetadata;
        DataSourceMetadata metadata = this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(this.supervisorId);
        if (metadata == null) {
            log.info("Checkpointed metadata in null for supervisor[%s] for dataSource[%s] - inserting metadata[%s]", new Object[]{this.supervisorId, this.dataSource, resetMetadata});
            metadataUpdateSuccess = this.indexerMetadataStorageCoordinator.insertDataSourceMetadata(this.supervisorId, (DataSourceMetadata)resetMetadata);
        } else {
            if (!this.checkSourceMetadataMatch(metadata)) {
                throw InvalidInput.exception((String)"Datasource metadata instance does not match required, found instance of [%s]", (Object[])new Object[]{metadata.getClass()});
            }
            SeekableStreamDataSourceMetadata currentMetadata = (SeekableStreamDataSourceMetadata)metadata;
            DataSourceMetadata newMetadata = currentMetadata.plus(resetMetadata);
            log.info("Current checkpointed metadata[%s], new metadata[%s] for supervisor[%s] for dataSource[%s]", new Object[]{currentMetadata, newMetadata, this.supervisorId, this.dataSource});
            try {
                metadataUpdateSuccess = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.supervisorId, newMetadata);
            }
            catch (IOException e) {
                log.error("Reset offsets for supervisor[%s] for dataSource[%s] with metadata[%s] failed [%s]", new Object[]{this.supervisorId, this.dataSource, newMetadata, e.getMessage()});
                throw new RuntimeException(e);
            }
        }
        if (!metadataUpdateSuccess) {
            throw new ISE("Unable to reset metadata[%s] for supervisor[%s] for dataSource[%s]", new Object[]{this.supervisorId, this.dataSource, dataSourceMetadata});
        }
        resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> {
            int groupId = this.getTaskGroupIdForPartition(partition);
            this.killTaskGroupForPartitions((Set<PartitionIdType>)ImmutableSet.of((Object)partition), "DataSourceMetadata is updated while reset offsets is called", new Object[0]);
            this.activelyReadingTaskGroups.remove(groupId);
            this.partitionGroups.computeIfAbsent(groupId, k -> new HashSet());
            this.partitionOffsets.put(partition, this.getNotSetMarker());
        });
    }

    private void killTask(String id, String reasonFormat, Object ... args) {
        Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue)taskQueue.get()).shutdown(id, reasonFormat, args);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private void killTaskWithSuccess(String id, String reasonFormat, Object ... args) {
        Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue)taskQueue.get()).shutdownWithSuccess(id, reasonFormat, args);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, Object ... args) {
        if (taskGroup != null) {
            for (String taskId : taskGroup.tasks.keySet()) {
                this.killTask(taskId, reasonFormat, args);
            }
        }
    }

    private void killTaskGroupForPartitions(Set<PartitionIdType> partitions, String reasonFormat, Object ... args) {
        for (PartitionIdType partition : partitions) {
            int taskGroupId = this.getTaskGroupIdForPartition(partition);
            this.killTasksInGroup(this.activelyReadingTaskGroups.get(taskGroupId), reasonFormat, args);
            this.partitionGroups.remove(taskGroupId);
            this.activelyReadingTaskGroups.remove(taskGroupId);
        }
    }

    private boolean isTaskInPendingCompletionGroups(String taskId) {
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup taskGroup : list) {
                if (!taskGroup.tasks.containsKey(taskId)) continue;
                return true;
            }
        }
        return false;
    }

    public void handoffTaskGroupsEarly(List<Integer> taskGroupIds) {
        this.addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
    }

    private void discoverTasks() throws ExecutionException, InterruptedException {
        String taskId;
        int taskCount = 0;
        ArrayList<String> futureTaskIds = new ArrayList<String>();
        ArrayList futures = new ArrayList();
        final HashMap taskGroupsToVerify = new HashMap();
        final Map<String, Task> activeTaskMap = this.getActiveTaskMap();
        for (Task task : activeTaskMap.values()) {
            Set taskPartitions;
            Sets.SetView inactivePartitionsInTask;
            if (!this.doesTaskMatchSupervisor(task)) continue;
            ++taskCount;
            final SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask)task;
            taskId = task.getId();
            if (this.supportsPartitionExpiration() && !(inactivePartitionsInTask = Sets.difference(taskPartitions = seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet(), new HashSet<PartitionIdType>(this.partitionIds))).isEmpty()) {
                this.killTaskWithSuccess(taskId, "Task partition set[%s] has inactive partitions[%s].", taskPartitions, inactivePartitionsInTask);
                continue;
            }
            Iterator it = seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet().iterator();
            final Integer taskGroupId = it.hasNext() ? Integer.valueOf(this.getTaskGroupIdForPartition(it.next())) : null;
            if (taskGroupId == null) continue;
            TaskGroup taskGroup = this.activelyReadingTaskGroups.get(taskGroupId);
            if (this.isTaskInPendingCompletionGroups(taskId) || taskGroup != null && taskGroup.tasks.containsKey(taskId)) continue;
            futureTaskIds.add(taskId);
            futures.add(Futures.transform(this.getStatusAndPossiblyEndOffsets(taskId), (Function)new Function<Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>>, Boolean>(){

                public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>> pair) {
                    SeekableStreamIndexTaskRunner.Status status = (SeekableStreamIndexTaskRunner.Status)((Object)pair.lhs);
                    Map publishingTaskEndOffsets = (Map)pair.rhs;
                    try {
                        log.debug("Task [%s], status [%s]", new Object[]{taskId, status});
                        if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) {
                            Object sequence;
                            Object partition2;
                            seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> SeekableStreamSupervisor.this.addDiscoveredTaskToPendingCompletionTaskGroups(SeekableStreamSupervisor.this.getTaskGroupIdForPartition(partition), taskId, seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap()));
                            boolean endOffsetsAreInvalid = false;
                            for (Map.Entry entry : publishingTaskEndOffsets.entrySet()) {
                                partition2 = entry.getKey();
                                sequence = entry.getValue();
                                if (!sequence.equals(SeekableStreamSupervisor.this.getEndOfPartitionMarker())) continue;
                                log.info("Got end-of-partition(EOS) marker for partition[%s] from task[%s] in discoverTasks, clearing partition offset to refetch from metadata.", new Object[]{partition2, taskId});
                                endOffsetsAreInvalid = true;
                                SeekableStreamSupervisor.this.partitionOffsets.put(partition2, SeekableStreamSupervisor.this.getNotSetMarker());
                            }
                            if (!endOffsetsAreInvalid) {
                                for (Map.Entry entry : publishingTaskEndOffsets.entrySet()) {
                                    boolean succeeded;
                                    partition2 = entry.getKey();
                                    sequence = entry.getValue();
                                    do {
                                        succeeded = true;
                                        Object previousOffset = SeekableStreamSupervisor.this.partitionOffsets.putIfAbsent(partition2, sequence);
                                        if (previousOffset == null || SeekableStreamSupervisor.this.makeSequenceNumber(previousOffset).compareTo(SeekableStreamSupervisor.this.makeSequenceNumber(sequence)) >= 0) continue;
                                        succeeded = SeekableStreamSupervisor.this.partitionOffsets.replace(partition2, previousOffset, sequence);
                                    } while (!succeeded);
                                }
                            }
                        } else {
                            for (Object partition3 : seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()) {
                                if (taskGroupId.equals(SeekableStreamSupervisor.this.getTaskGroupIdForPartition(partition3))) continue;
                                log.warn("Stopping task[%s] as it does not match the current partition allocation.", new Object[]{taskId});
                                return false;
                            }
                            if (!SeekableStreamSupervisor.this.isTaskCurrent(taskGroupId, taskId, activeTaskMap)) {
                                log.info("Stopping task[%s] as it does not match the current supervisor spec.", new Object[]{taskId});
                                return false;
                            }
                            TaskGroup taskGroup = SeekableStreamSupervisor.this.activelyReadingTaskGroups.computeIfAbsent(taskGroupId, k -> {
                                log.info("Creating a new task group for taskGroupId[%d]", new Object[]{taskGroupId});
                                return new TaskGroup(taskGroupId, ImmutableMap.copyOf(seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap()), null, seekableStreamIndexTask.getIOConfig().getMinimumMessageTime(), seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(), seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getExclusivePartitions(), seekableStreamIndexTask.getIOConfig().getBaseSequenceName());
                            });
                            taskGroupsToVerify.put(taskGroupId, taskGroup);
                            TaskData prevTaskData = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
                            if (prevTaskData != null) {
                                throw new ISE("Task[%s] already exists in taskGroup[%d] with data[%s]", new Object[]{taskId, taskGroup.groupId, prevTaskData});
                            }
                            SeekableStreamSupervisor.this.verifySameSequenceNameForAllTasksInGroup(taskGroupId);
                        }
                        return true;
                    }
                    catch (Throwable t) {
                        SeekableStreamSupervisor.this.stateManager.recordThrowableEvent(t);
                        log.error(t, "An error occurred while discovering task[%s]", new Object[]{taskId});
                        return null;
                    }
                }
            }, (Executor)this.workerExec));
        }
        List results = this.coalesceAndAwait(futures);
        ArrayList stopFutures = new ArrayList();
        for (int i = 0; i < results.size(); ++i) {
            taskId = (String)futureTaskIds.get(i);
            if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
                this.killTask(taskId, "Task[%s] failed to return status, killing task", taskId);
                continue;
            }
            if (!Boolean.valueOf(false).equals(results.get(i).valueOrThrow())) continue;
            stopFutures.add(this.stopTask(taskId, false));
        }
        log.debug("Found [%d] seekablestream indexing tasks for supervisor[%s] for datasource[%s]", new Object[]{taskCount, this.supervisorId, this.dataSource});
        if (!stopFutures.isEmpty()) {
            this.coalesceAndAwait(stopFutures);
        }
        this.verifyAndMergeCheckpoints(taskGroupsToVerify.values());
        this.resumeAllActivelyReadingTasks();
    }

    private ListenableFuture<Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>>> getStatusAndPossiblyEndOffsets(String taskId) {
        return FutureUtils.transformAsync(this.taskClient.getStatusAsync(taskId), status -> {
            if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) {
                return FutureUtils.transform(this.taskClient.getEndOffsetsAsync(taskId), endOffsets -> Pair.of((Object)status, (Object)endOffsets));
            }
            return Futures.immediateFuture((Object)Pair.of((Object)status, null));
        });
    }

    private void resumeAllActivelyReadingTasks() {
        if (!this.getState().isFirstRunOnly()) {
            return;
        }
        HashMap<String, ListenableFuture<Boolean>> tasksToResume = new HashMap<String, ListenableFuture<Boolean>>();
        if (this.activelyReadingTaskGroups.isEmpty()) {
            return;
        }
        if (!this.taskMaster.getTaskRunner().isPresent()) {
            return;
        }
        Set runningTaskIds = ((TaskRunner)this.taskMaster.getTaskRunner().get()).getRunningTasks().stream().map(TaskRunnerWorkItem::getTaskId).collect(Collectors.toSet());
        for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
            for (String taskId : taskGroup.tasks.keySet()) {
                if (!runningTaskIds.contains(taskId)) continue;
                tasksToResume.put(taskId, this.taskClient.resumeAsync(taskId));
            }
        }
        String killMsg = "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.";
        for (Map.Entry entry : tasksToResume.entrySet()) {
            String taskId;
            taskId = (String)entry.getKey();
            ListenableFuture future = (ListenableFuture)entry.getValue();
            future.addListener(() -> {
                try {
                    if (((Boolean)((ListenableFuture)entry.getValue()).get()).booleanValue()) {
                        log.info("Resumed task[%s] in first supervisor run.", new Object[]{taskId});
                    } else {
                        log.warn("Failed to resume task[%s] in first supervisor run.", new Object[]{taskId});
                        this.killTask(taskId, "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.", new Object[0]);
                    }
                }
                catch (Exception e) {
                    log.warn((Throwable)e, "Failed to resume task[%s] in first supervisor run.", new Object[]{taskId});
                    this.killTask(taskId, "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.", new Object[0]);
                }
            }, (Executor)this.workerExec);
        }
    }

    private void verifyAndMergeCheckpoints(Collection<TaskGroup> taskGroupsToVerify) {
        ArrayList futures = new ArrayList();
        for (TaskGroup taskGroup : taskGroupsToVerify) {
            futures.add(this.verifyAndMergeCheckpoints(taskGroup));
        }
        try {
            this.coalesceAndAwait(futures);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private ListenableFuture<?> verifyAndMergeCheckpoints(TaskGroup taskGroup) {
        ArrayList<ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> futures = new ArrayList<ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>>();
        ArrayList<String> taskIds = new ArrayList<String>();
        for (String taskId : taskGroup.taskIds()) {
            ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture = this.taskClient.getCheckpointsAsync(taskId, true);
            futures.add(checkpointsFuture);
            taskIds.add(taskId);
        }
        return Futures.transform((ListenableFuture)FutureUtils.coalesce(futures), futuresResult -> {
            this.verifyAndMergeCheckpoints(taskGroup, (List<String>)taskIds, (List<Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>>)futuresResult);
            return null;
        }, (Executor)this.workerExec);
    }

    private void verifyAndMergeCheckpoints(TaskGroup taskGroup, List<String> taskIds, List<Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> checkpointResults) {
        int groupId = taskGroup.groupId;
        ArrayList<Pair> taskSequences = new ArrayList<Pair>();
        for (int i = 0; i < checkpointResults.size(); ++i) {
            Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointResult = checkpointResults.get(i);
            String taskId = taskIds.get(i);
            if (checkpointResult.isError()) {
                RuntimeException e = new RuntimeException((Throwable)checkpointResult.error());
                this.stateManager.recordThrowableEvent(e);
                log.error((Throwable)e, "Problem while getting checkpoints for task [%s], killing the task", new Object[]{taskId});
                this.killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
                taskGroup.tasks.remove(taskId);
                continue;
            }
            if (((TreeMap)checkpointResult.valueOrThrow()).isEmpty()) {
                log.warn("Ignoring task [%s], as probably it is not started running yet", new Object[]{taskId});
                continue;
            }
            taskSequences.add(new Pair((Object)taskId, (Object)((TreeMap)checkpointResult.valueOrThrow())));
        }
        DataSourceMetadata rawDataSourceMetadata = this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(this.supervisorId);
        if (rawDataSourceMetadata != null && !this.checkSourceMetadataMatch(rawDataSourceMetadata)) {
            throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{rawDataSourceMetadata.getClass()});
        }
        SeekableStreamDataSourceMetadata latestDataSourceMetadata = (SeekableStreamDataSourceMetadata)rawDataSourceMetadata;
        boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null && latestDataSourceMetadata.getSeekableStreamSequenceNumbers() != null && this.ioConfig.getStream().equals(latestDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream());
        Map latestOffsetsFromDb = hasValidOffsetsFromDb ? latestDataSourceMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap() : null;
        taskSequences.sort((o1, o2) -> ((Integer)((TreeMap)o2.rhs).firstKey()).compareTo((Integer)((TreeMap)o1.rhs).firstKey()));
        HashSet<String> tasksToKill = new HashSet<String>();
        AtomicInteger earliestConsistentSequenceId = new AtomicInteger(-1);
        for (int taskIndex = 0; taskIndex < taskSequences.size(); ++taskIndex) {
            TreeMap taskCheckpoints = (TreeMap)((Pair)taskSequences.get((int)taskIndex)).rhs;
            String taskId = (String)((Pair)taskSequences.get((int)taskIndex)).lhs;
            if (earliestConsistentSequenceId.get() == -1) {
                if (taskCheckpoints.entrySet().stream().anyMatch(sequenceCheckpoint -> ((Map)sequenceCheckpoint.getValue()).entrySet().stream().allMatch(partitionOffset -> {
                    OrderedSequenceNumber sequence = this.makeSequenceNumber(partitionOffset.getValue());
                    OrderedSequenceNumber latestOffset = this.makeSequenceNumber(latestOffsetsFromDb == null ? partitionOffset.getValue() : latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue()));
                    return sequence.compareTo(latestOffset) == 0;
                }) && earliestConsistentSequenceId.compareAndSet(-1, (Integer)sequenceCheckpoint.getKey())) || !this.pendingCompletionTaskGroups.getOrDefault(groupId, new CopyOnWriteArrayList()).isEmpty() && earliestConsistentSequenceId.compareAndSet(-1, (Integer)taskCheckpoints.firstKey())) {
                    TreeMap latestCheckpoints = new TreeMap(taskCheckpoints.tailMap(earliestConsistentSequenceId.get()));
                    log.info("Setting taskGroup sequences to [%s] for group [%d]", new Object[]{latestCheckpoints, groupId});
                    taskGroup.checkpointSequences.clear();
                    taskGroup.checkpointSequences.putAll(latestCheckpoints);
                    continue;
                }
                log.debug("Adding task[%s] to kill list, checkpoints[%s], latestoffsets from DB [%s]", new Object[]{taskId, taskCheckpoints, latestOffsetsFromDb});
                tasksToKill.add(taskId);
                continue;
            }
            if (taskCheckpoints.get(taskGroup.checkpointSequences.firstKey()) != null && ((Map)taskCheckpoints.get(taskGroup.checkpointSequences.firstKey())).equals(taskGroup.checkpointSequences.firstEntry().getValue()) && taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size() == taskGroup.checkpointSequences.size()) continue;
            log.debug("Adding task[%s] to kill list, checkpoints[%s], taskgroup checkpoints [%s]", new Object[]{taskId, taskCheckpoints, taskGroup.checkpointSequences});
            tasksToKill.add(taskId);
        }
        if (!tasksToKill.isEmpty() && tasksToKill.size() == taskGroup.tasks.size() || taskGroup.tasks.isEmpty() && this.pendingCompletionTaskGroups.getOrDefault(groupId, new CopyOnWriteArrayList()).isEmpty()) {
            log.warn("Clearing task group[%d] information as no valid tasks left in the group", new Object[]{groupId});
            this.activelyReadingTaskGroups.remove(groupId);
            for (Object partitionId : taskGroup.startingSequences.keySet()) {
                this.partitionOffsets.put(partitionId, this.getNotSetMarker());
            }
        }
        taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(sequenceCheckpoint -> {
            this.killTask((String)sequenceCheckpoint.lhs, "Killing task[%s], as its checkpoints[%s] are not consistent with group checkpoints[%s] or latest persisted sequences in metadata store[%s].", sequenceCheckpoint.lhs, sequenceCheckpoint.rhs, taskGroup.checkpointSequences, latestOffsetsFromDb);
            taskGroup.tasks.remove(sequenceCheckpoint.lhs);
        });
    }

    @VisibleForTesting
    protected void addDiscoveredTaskToPendingCompletionTaskGroups(int groupId, String taskId, Map<PartitionIdType, SequenceOffsetType> startingPartitions) {
        CopyOnWriteArrayList taskGroupList = this.pendingCompletionTaskGroups.compute(groupId, (k, val) -> {
            if (val == null) {
                val = new CopyOnWriteArrayList<TaskGroup>();
            }
            boolean isTaskGroupPresent = false;
            for (TaskGroup taskGroup : val) {
                if (!taskGroup.startingSequences.equals((Object)startingPartitions)) continue;
                isTaskGroupPresent = true;
                break;
            }
            if (!isTaskGroupPresent) {
                log.info("Creating new pending completion task group[%s] for discovered task[%s].", new Object[]{groupId, taskId});
                TaskGroup newTaskGroup = new TaskGroup(groupId, ImmutableMap.copyOf((Map)startingPartitions), null, null, null, null);
                newTaskGroup.tasks.put(taskId, new TaskData());
                newTaskGroup.completionTimeout = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
                val.add(newTaskGroup);
            }
            return val;
        });
        for (TaskGroup taskGroup : taskGroupList) {
            if (!taskGroup.startingSequences.equals(startingPartitions)) continue;
            if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
                log.info("Added discovered task[%s] to taskGroup[%s] with pending completion tasks[%s].", new Object[]{taskId, groupId, taskGroup.taskIds()});
            }
            return;
        }
    }

    @VisibleForTesting
    protected CopyOnWriteArrayList<TaskGroup> getPendingCompletionTaskGroups(int groupId) {
        return this.pendingCompletionTaskGroups.get(groupId);
    }

    private void verifySameSequenceNameForAllTasksInGroup(int groupId) {
        String taskGroupSequenceName = this.activelyReadingTaskGroups.get((Object)Integer.valueOf((int)groupId)).baseSequenceName;
        boolean allSequenceNamesMatch = this.activelyReadingTaskGroups.get((Object)Integer.valueOf((int)groupId)).tasks.keySet().stream().map(x -> {
            Optional<Task> taskOptional = this.taskStorage.getTask((String)x);
            if (!taskOptional.isPresent() || !this.doesTaskMatchSupervisor((Task)taskOptional.get())) {
                return Boolean.valueOf(false);
            }
            SeekableStreamIndexTask task = (SeekableStreamIndexTask)taskOptional.get();
            return task.getIOConfig().getBaseSequenceName();
        }).allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName));
        if (!allSequenceNamesMatch) {
            throw new ISE("Base sequence names do not match for the tasks in the task group with ID [%s]", new Object[]{groupId});
        }
    }

    private ListenableFuture<Void> stopTask(final String id, boolean publish) {
        return Futures.transform(this.taskClient.stopAsync(id, publish), (Function)new Function<Boolean, Void>(){

            @Nullable
            public Void apply(@Nullable Boolean result) {
                if (result == null || !result.booleanValue()) {
                    log.info("Killing task[%s] as it failed to stop in a timely manner.", new Object[]{id});
                    SeekableStreamSupervisor.this.killTask(id, "Failed to stop in a timely manner", new Object[0]);
                }
                return null;
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    @VisibleForTesting
    public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String, Task> activeTaskMap) {
        Task genericTask = activeTaskMap != null && activeTaskMap.containsKey(taskId) ? activeTaskMap.get(taskId) : (Task)this.taskStorage.getTask(taskId).orNull();
        if (genericTask == null || !this.doesTaskMatchSupervisor(genericTask)) {
            return false;
        }
        SeekableStreamIndexTask task = (SeekableStreamIndexTask)genericTask;
        String taskSequenceName = this.generateSequenceName(task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), task.getIOConfig().getMinimumMessageTime(), task.getIOConfig().getMaximumMessageTime(), task.getDataSchema(), task.getTuningConfig());
        if (this.activelyReadingTaskGroups.get(taskGroupId) != null) {
            TaskGroup taskGroup = this.activelyReadingTaskGroups.get(taskGroupId);
            return this.generateSequenceName((Map<PartitionIdType, SequenceOffsetType>)taskGroup.startingSequences, taskGroup.minimumMessageTime, taskGroup.maximumMessageTime, this.spec.getDataSchema(), this.taskTuningConfig).equals(taskSequenceName);
        }
        return this.generateSequenceName(task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), task.getIOConfig().getMinimumMessageTime(), task.getIOConfig().getMaximumMessageTime(), this.spec.getDataSchema(), this.taskTuningConfig).equals(taskSequenceName);
    }

    @VisibleForTesting
    public String generateSequenceName(Map<PartitionIdType, SequenceOffsetType> startPartitions, DateTime minimumMessageTime, DateTime maximumMessageTime, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig tuningConfig) {
        String tuningConfigStr;
        String dataSchemaStr;
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : startPartitions.entrySet()) {
            sb.append(StringUtils.format((String)"+%s(%s)", (Object[])new Object[]{entry.getKey().toString(), entry.getValue().toString()}));
        }
        String partitionOffsetStr = startPartitions.isEmpty() ? "" : sb.substring(1);
        String minMsgTimeStr = minimumMessageTime == null ? "" : String.valueOf(minimumMessageTime.getMillis());
        String maxMsgTimeStr = maximumMessageTime == null ? "" : String.valueOf(maximumMessageTime.getMillis());
        try {
            dataSchemaStr = this.sortingMapper.writeValueAsString((Object)dataSchema);
            tuningConfigStr = this.sortingMapper.writeValueAsString((Object)tuningConfig);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        String hashCode = DigestUtils.sha1Hex((String)(dataSchemaStr + tuningConfigStr + partitionOffsetStr + minMsgTimeStr + maxMsgTimeStr)).substring(0, 15);
        return Joiner.on((String)"_").join((Object)this.baseTaskName(), (Object)this.supervisorId, new Object[]{hashCode});
    }

    protected abstract String baseTaskName();

    protected boolean supportsPartitionExpiration() {
        return false;
    }

    public int getPartitionCount() {
        return this.recordSupplier.getPartitionIds(this.ioConfig.getStream()).size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updatePartitionDataFromStream() {
        Set<PartitionIdType> partitionIdsFromSupplier;
        ArrayList<PartitionIdType> previousPartitionIds = new ArrayList<PartitionIdType>(this.partitionIds);
        this.recordSupplierLock.lock();
        try {
            partitionIdsFromSupplier = this.recordSupplier.getPartitionIds(this.ioConfig.getStream());
        }
        catch (Exception e) {
            this.stateManager.recordThrowableEvent(e);
            log.warn("Could not fetch partitions for topic/stream [%s]: %s", new Object[]{this.ioConfig.getStream(), e.getMessage()});
            log.debug((Throwable)e, "full stack trace", new Object[0]);
            boolean bl = false;
            return bl;
        }
        finally {
            this.recordSupplierLock.unlock();
        }
        if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 0) {
            String errMsg = StringUtils.format((String)"No partitions found for stream [%s]", (Object[])new Object[]{this.ioConfig.getStream()});
            this.stateManager.recordThrowableEvent(new StreamException((Throwable)new ISE(errMsg, new Object[0])));
            log.warn(errMsg, new Object[0]);
            return false;
        }
        log.debug("Found [%d] partitions for stream[%s]", new Object[]{partitionIdsFromSupplier.size(), this.ioConfig.getStream()});
        int configuredTaskCount = this.spec.getIoConfig().getTaskCount();
        if (configuredTaskCount > partitionIdsFromSupplier.size()) {
            log.warn("Configured task count[%s] for supervisor[%s] is greater than the number of partitions[%d].", new Object[]{configuredTaskCount, this.supervisorId, partitionIdsFromSupplier.size()});
        }
        Map<PartitionIdType, SequenceOffsetType> storedMetadata = this.getOffsetsFromMetadataStorage();
        Set<PartitionIdType> storedPartitions = storedMetadata.keySet();
        Set closedPartitions = storedMetadata.entrySet().stream().filter(x -> this.isEndOfShard(x.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
        Set previouslyExpiredPartitions = storedMetadata.entrySet().stream().filter(x -> this.isShardExpirationMarker(x.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
        Sets.SetView partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions = Sets.difference(partitionIdsFromSupplier, previouslyExpiredPartitions);
        Sets.SetView activePartitionsIdsFromSupplier = Sets.difference((Set)partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions, closedPartitions);
        Sets.SetView newlyClosedPartitions = Sets.intersection(closedPartitions, new HashSet<PartitionIdType>(previousPartitionIds));
        log.debug("active partitions from supplier: %s", new Object[]{activePartitionsIdsFromSupplier});
        if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() != partitionIdsFromSupplier.size()) {
            log.warn("Previously expired partitions [%s] were present in the current list [%s] from the record supplier.", new Object[]{previouslyExpiredPartitions, partitionIdsFromSupplier});
        }
        if (activePartitionsIdsFromSupplier.isEmpty()) {
            String errMsg = StringUtils.format((String)"No active partitions found for stream[%s] after removing closed and previously expired partitions", (Object[])new Object[]{this.ioConfig.getStream()});
            this.stateManager.recordThrowableEvent(new StreamException((Throwable)new ISE(errMsg, new Object[0])));
            log.warn(errMsg, new Object[0]);
            return false;
        }
        boolean initialPartitionDiscovery = this.partitionIds.isEmpty();
        for (Object partitionId : partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
            if (closedPartitions.contains(partitionId)) {
                log.info("Skipping partition[%s] as it is closed and has no more data.", new Object[]{partitionId});
                continue;
            }
            if (this.partitionIds.contains(partitionId)) continue;
            this.partitionIds.add(partitionId);
            if (initialPartitionDiscovery) continue;
            this.subsequentlyDiscoveredPartitions.add(partitionId);
        }
        if (this.supportsPartitionExpiration()) {
            this.cleanupClosedAndExpiredPartitions(storedPartitions, (Set<PartitionIdType>)newlyClosedPartitions, (Set<PartitionIdType>)activePartitionsIdsFromSupplier, previouslyExpiredPartitions, partitionIdsFromSupplier);
        }
        Int2ObjectLinkedOpenHashMap newlyDiscovered = new Int2ObjectLinkedOpenHashMap();
        for (Object partitionId : activePartitionsIdsFromSupplier) {
            int taskGroupId = this.getTaskGroupIdForPartition(partitionId);
            Set partitionGroup = this.partitionGroups.computeIfAbsent(taskGroupId, k -> new HashSet());
            partitionGroup.add(partitionId);
            if (this.partitionOffsets.putIfAbsent(partitionId, this.getNotSetMarker()) != null) continue;
            log.debug("Discovered new partition[%s] for stream [%s], added to taskGroup[%d]", new Object[]{partitionId, this.ioConfig.getStream(), taskGroupId});
            ((List)newlyDiscovered.computeIfAbsent(taskGroupId, k -> new ArrayList())).add(partitionId);
        }
        if (!newlyDiscovered.isEmpty()) {
            for (Int2ObjectMap.Entry taskGroupPartitions : newlyDiscovered.int2ObjectEntrySet()) {
                log.info("Discovered new partitions[%s] for stream[%s], added to taskGroup[%s].", new Object[]{taskGroupPartitions.getValue(), this.ioConfig.getStream(), taskGroupPartitions.getIntKey()});
            }
        }
        if (!this.partitionIds.equals(previousPartitionIds)) {
            this.assignRecordSupplierToPartitionIds();
            for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
                if (taskGroup.taskIds().isEmpty()) continue;
                this.earlyStopTime = DateTimes.nowUtc().plus((ReadableDuration)this.tuningConfig.getRepartitionTransitionDuration());
                log.info("Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]", new Object[]{previousPartitionIds, this.partitionIds, this.tuningConfig.getRepartitionTransitionDuration(), this.earlyStopTime});
                break;
            }
        }
        return true;
    }

    protected Map<PartitionIdType, SequenceOffsetType> getLatestSequencesFromStream() {
        return new HashMap();
    }

    private void assignRecordSupplierToPartitionIds() {
        block5: {
            this.recordSupplierLock.lock();
            try {
                Set partitions = this.partitionIds.stream().map(partitionId -> new StreamPartition<Object>(this.ioConfig.getStream(), partitionId)).collect(Collectors.toSet());
                if (this.recordSupplier.getAssignment().containsAll(partitions)) break block5;
                this.recordSupplier.assign(partitions);
                try {
                    this.recordSupplier.seekToEarliest(partitions);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            finally {
                this.recordSupplierLock.unlock();
            }
        }
    }

    private void cleanupClosedAndExpiredPartitions(Set<PartitionIdType> storedPartitions, Set<PartitionIdType> newlyClosedPartitions, Set<PartitionIdType> activePartitionsIdsFromSupplier, Set<PartitionIdType> previouslyExpiredPartitions, Set<PartitionIdType> partitionIdsFromSupplier) {
        Sets.SetView newlyExpiredPartitions = Sets.difference(storedPartitions, previouslyExpiredPartitions);
        if (!(newlyExpiredPartitions = Sets.difference((Set)newlyExpiredPartitions, partitionIdsFromSupplier)).isEmpty()) {
            log.info("Detected newly expired partitions: " + String.valueOf(newlyExpiredPartitions), new Object[0]);
            SeekableStreamDataSourceMetadata currentMetadata = (SeekableStreamDataSourceMetadata)this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(this.supervisorId);
            SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> cleanedMetadata = this.createDataSourceMetadataWithExpiredPartitions(currentMetadata, (Set<PartitionIdType>)newlyExpiredPartitions);
            log.info("New metadata after partition expiration: " + String.valueOf(cleanedMetadata), new Object[0]);
            this.validateMetadataPartitionExpiration((Set<PartitionIdType>)newlyExpiredPartitions, currentMetadata, cleanedMetadata);
            try {
                boolean success = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.supervisorId, cleanedMetadata);
                if (!success) {
                    log.error("Failed to update datasource metadata[%s] with expired partitions removed", new Object[]{cleanedMetadata});
                }
            }
            catch (IOException ioe) {
                throw new RuntimeException(ioe);
            }
        }
        if (!newlyClosedPartitions.isEmpty()) {
            log.info("Detected newly closed partitions: " + String.valueOf(newlyClosedPartitions), new Object[0]);
        }
        if (!newlyClosedPartitions.isEmpty() || !newlyExpiredPartitions.isEmpty()) {
            Map<Integer, Set<PartitionIdType>> newPartitionGroups = this.recomputePartitionGroupsForExpiration(activePartitionsIdsFromSupplier);
            this.validatePartitionGroupReassignments(activePartitionsIdsFromSupplier, newPartitionGroups);
            log.info("New partition groups after removing closed and expired partitions: " + String.valueOf(newPartitionGroups), new Object[0]);
            this.partitionIds.clear();
            this.partitionIds.addAll(activePartitionsIdsFromSupplier);
            this.assignRecordSupplierToPartitionIds();
            for (Integer groupId : this.partitionGroups.keySet()) {
                if (newPartitionGroups.containsKey(groupId)) {
                    this.partitionGroups.put(groupId, newPartitionGroups.get(groupId));
                    continue;
                }
                this.partitionGroups.put(groupId, new HashSet());
            }
        }
    }

    protected Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration(Set<PartitionIdType> availablePartitions) {
        throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
    }

    protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata, Set<PartitionIdType> expiredPartitionIds) {
        throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
    }

    private void validateMetadataPartitionExpiration(Set<PartitionIdType> newlyExpiredPartitions, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> oldMetadata, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> cleanedMetadata) {
        Map<PartitionIdType, SequenceOffsetType> oldPartitionSeqNos = oldMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap();
        Map<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNos = cleanedMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap();
        for (Map.Entry<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNo : cleanedPartitionSeqNos.entrySet()) {
            if (!oldPartitionSeqNos.containsKey(cleanedPartitionSeqNo.getKey())) {
                throw new IAE("Cleaned partition map [%s] contains unexpected partition ID [%s], original partition map: [%s]", new Object[]{cleanedPartitionSeqNos, cleanedPartitionSeqNo.getKey(), oldPartitionSeqNos});
            }
            SequenceOffsetType oldOffset = oldPartitionSeqNos.get(cleanedPartitionSeqNo.getKey());
            if (newlyExpiredPartitions.contains(cleanedPartitionSeqNo.getKey())) {
                if (this.isShardExpirationMarker(cleanedPartitionSeqNo.getValue())) continue;
                throw new IAE("Newly expired partition [%] was not marked as expired in the cleaned partition map [%s], original partition map: [%s]", new Object[]{cleanedPartitionSeqNo.getKey(), cleanedPartitionSeqNos, oldPartitionSeqNos});
            }
            if (oldOffset.equals(cleanedPartitionSeqNo.getValue())) continue;
            throw new IAE("Cleaned partition map [%s] has offset mismatch for partition ID [%s], original partition map: [%s]", new Object[]{cleanedPartitionSeqNos, cleanedPartitionSeqNo.getKey(), oldPartitionSeqNos});
        }
    }

    private void validatePartitionGroupReassignments(Set<PartitionIdType> activePartitionsIdsFromSupplier, Map<Integer, Set<PartitionIdType>> newPartitionGroups) {
        for (Set<PartitionIdType> newGroup : newPartitionGroups.values()) {
            for (PartitionIdType partitionInNewGroup : newGroup) {
                if (activePartitionsIdsFromSupplier.contains(partitionInNewGroup)) continue;
                throw new IAE("Recomputed partition groups [%s] contains unexpected partition ID [%s], old partition groups: [%s]", new Object[]{newPartitionGroups, partitionInNewGroup, this.partitionGroups});
            }
        }
    }

    /*
     * WARNING - void declaration
     */
    private void updateTaskStatus() throws ExecutionException, InterruptedException {
        void var4_8;
        ArrayList futures = new ArrayList();
        ArrayList<String> futureTaskIds = new ArrayList<String>();
        for (TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                String taskId = entry.getKey();
                final TaskData taskData = entry.getValue();
                if (taskData.startTime == null) {
                    futureTaskIds.add(taskId);
                    futures.add(Futures.transform(this.taskClient.getStartTimeAsync(taskId), (Function)new Function<DateTime, Boolean>(){

                        public Boolean apply(@Nullable DateTime startTime) {
                            if (startTime == null) {
                                return false;
                            }
                            taskData.startTime = startTime;
                            long millisRemaining = SeekableStreamSupervisor.this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - taskData.startTime.getMillis());
                            if (millisRemaining > 0L) {
                                SeekableStreamSupervisor.this.scheduledExec.schedule(SeekableStreamSupervisor.this.buildRunTask(), millisRemaining + 1000L, TimeUnit.MILLISECONDS);
                            }
                            return true;
                        }
                    }, (Executor)this.workerExec));
                }
                taskData.status = (TaskStatus)this.taskStorage.getStatus(taskId).get();
            }
        }
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup group : list) {
                for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
                    entry.getValue().status = (TaskStatus)this.taskStorage.getStatus(entry.getKey()).get();
                }
            }
        }
        List results = this.coalesceAndAwait(futures);
        boolean bl = false;
        while (var4_8 < results.size()) {
            if (results.get((int)var4_8).isError()) {
                String taskId = (String)futureTaskIds.get((int)var4_8);
                log.noStackTrace().warn((Throwable)results.get((int)var4_8).error(), "Task [%s] failed to return start time, killing task", new Object[]{taskId});
                this.killTask(taskId, "Task [%s] failed to return start time, killing task", taskId);
            }
            ++var4_8;
        }
    }

    private void checkTaskDuration() throws ExecutionException, InterruptedException {
        boolean stopTasksEarly;
        ArrayList futures = new ArrayList();
        ArrayList futureGroupIds = new ArrayList();
        if (this.earlyStopTime != null && !this.earlyStopTime.isAfterNow()) {
            log.info("Early stop requested for supervisor[%s], signalling tasks to complete.", new Object[]{this.supervisorId});
            this.earlyStopTime = null;
            stopTasksEarly = true;
        } else {
            stopTasksEarly = false;
        }
        AtomicInteger numStoppedTasks = new AtomicInteger();
        this.activelyReadingTaskGroups.entrySet().stream().sorted(Comparator.comparingLong(taskGroupEntry -> this.computeEarliestTaskStartTime((TaskGroup)taskGroupEntry.getValue()).getMillis())).forEach(entry -> {
            int numPendingCompletionTaskGroups;
            Integer groupId = (Integer)entry.getKey();
            TaskGroup group = (TaskGroup)entry.getValue();
            DateTime earliestTaskStart = this.computeEarliestTaskStartTime(group);
            Duration runDuration = Duration.millis((long)(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis()));
            if (stopTasksEarly || group.getHandoffEarly().booleanValue()) {
                log.info("Stopping taskGroup[%d] early after running for duration[%s].", new Object[]{groupId, runDuration});
                futureGroupIds.add(groupId);
                futures.add(this.checkpointTaskGroup(group, true));
                if (group.getHandoffEarly().booleanValue()) {
                    numStoppedTasks.getAndIncrement();
                }
            } else if (earliestTaskStart.plus((ReadableDuration)this.ioConfig.getTaskDuration()).isBeforeNow() && (numPendingCompletionTaskGroups = this.pendingCompletionTaskGroups.values().stream().mapToInt(List::size).sum()) + numStoppedTasks.get() < this.ioConfig.getMaxAllowedStops()) {
                log.info("Stopping taskGroup[%d] as it has already run for duration[%s], configured task duration[%s].", new Object[]{groupId, runDuration, this.ioConfig.getTaskDuration()});
                futureGroupIds.add(groupId);
                futures.add(this.checkpointTaskGroup(group, true));
                numStoppedTasks.getAndIncrement();
            }
        });
        List results = this.coalesceAndAwait(futures);
        for (int j = 0; j < results.size(); ++j) {
            Integer groupId = (Integer)futureGroupIds.get(j);
            TaskGroup group = this.activelyReadingTaskGroups.get(groupId);
            if (results.get(j).isValue() && results.get(j).valueOrThrow() != null) {
                Map endOffsets = (Map)results.get(j).valueOrThrow();
                group.completionTimeout = DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
                this.pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList()).add(group);
                boolean endOffsetsAreInvalid = false;
                for (Map.Entry entry2 : endOffsets.entrySet()) {
                    if (!entry2.getValue().equals(this.getEndOfPartitionMarker())) continue;
                    log.info("Got end-of-partition(EOS) marker for partition[%s] in checkTaskDuration, not updating partition offset.", new Object[]{entry2.getKey()});
                    endOffsetsAreInvalid = true;
                }
                if (!endOffsetsAreInvalid) {
                    this.partitionOffsets.putAll(endOffsets);
                } else {
                    for (Map.Entry entry2 : endOffsets.entrySet()) {
                        this.partitionOffsets.put(entry2.getKey(), this.getNotSetMarker());
                    }
                }
            } else {
                for (String id : group.taskIds()) {
                    this.killTask(id, "All tasks in group[%s] failed to transition to publishing state", groupId);
                }
                for (Object partitionId : group.startingSequences.keySet()) {
                    this.partitionOffsets.put(partitionId, this.getNotSetMarker());
                }
            }
            this.activelyReadingTaskGroups.remove(groupId);
        }
    }

    private DateTime computeEarliestTaskStartTime(TaskGroup group) {
        return group.tasks.values().stream().filter(taskData -> taskData.startTime != null).map(taskData -> taskData.startTime).min(AbstractInstant::compareTo).orElse(DateTimes.nowUtc());
    }

    private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> checkpointTaskGroup(TaskGroup taskGroup, boolean finalize) {
        if (finalize) {
            Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
            while (i.hasNext()) {
                Map.Entry<String, TaskData> taskEntry = i.next();
                String taskId = taskEntry.getKey();
                TaskData task = taskEntry.getValue();
                if (task.status == null) continue;
                if (task.status.isSuccess()) {
                    this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                    return Futures.transform(this.stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()), (Function)new Function<Object, Map<PartitionIdType, SequenceOffsetType>>(){

                        @Nullable
                        public Map<PartitionIdType, SequenceOffsetType> apply(@Nullable Object input) {
                            return null;
                        }
                    }, (Executor)MoreExecutors.directExecutor());
                }
                if (!task.status.isRunnable() || !this.taskInfoProvider.getTaskLocation(taskId).equals((Object)TaskLocation.unknown())) continue;
                this.killTask(taskId, "Killing task [%s] which hasn't been assigned to a worker", taskId);
                i.remove();
            }
        }
        ArrayList<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> pauseFutures = new ArrayList<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>>();
        ImmutableList pauseTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
        for (String taskId : pauseTaskIds) {
            pauseFutures.add(this.taskClient.pauseAsync(taskId));
        }
        return Futures.transformAsync((ListenableFuture)FutureUtils.coalesce(pauseFutures), (AsyncFunction)new AsyncFunction<List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>>, Map<PartitionIdType, SequenceOffsetType>>((List)pauseTaskIds, taskGroup, finalize){
            final /* synthetic */ List val$pauseTaskIds;
            final /* synthetic */ TaskGroup val$taskGroup;
            final /* synthetic */ boolean val$finalize;
            {
                this.val$pauseTaskIds = list;
                this.val$taskGroup = taskGroup;
                this.val$finalize = bl;
            }

            public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> apply(List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> input) {
                HashMap endOffsets = new HashMap();
                for (int i = 0; i < input.size(); ++i) {
                    String taskId = (String)this.val$pauseTaskIds.get(i);
                    if (input.get(i).isError()) {
                        Throwable pauseException = (Throwable)input.get(i).error();
                        SeekableStreamSupervisor.this.stateManager.recordThrowableEvent(pauseException);
                        SeekableStreamSupervisor.this.killTask(taskId, "An exception occurred while waiting for task [%s] to pause: [%s]", taskId, pauseException);
                        this.val$taskGroup.tasks.remove(taskId);
                        continue;
                    }
                    if (input.get(i).valueOrThrow() == null || ((Map)input.get(i).valueOrThrow()).isEmpty()) {
                        SeekableStreamSupervisor.this.killTask(taskId, "Task [%s] returned empty offsets after pause", taskId);
                        this.val$taskGroup.tasks.remove(taskId);
                        continue;
                    }
                    for (Map.Entry sequence : ((Map)input.get(i).valueOrThrow()).entrySet()) {
                        if (endOffsets.containsKey(sequence.getKey()) && SeekableStreamSupervisor.this.makeSequenceNumber(endOffsets.get(sequence.getKey())).compareTo(SeekableStreamSupervisor.this.makeSequenceNumber(sequence.getValue())) >= 0) continue;
                        endOffsets.put(sequence.getKey(), sequence.getValue());
                    }
                }
                ArrayList<ListenableFuture<Boolean>> setEndOffsetFutures = new ArrayList<ListenableFuture<Boolean>>();
                ImmutableList setEndOffsetTaskIds = ImmutableList.copyOf(this.val$taskGroup.taskIds());
                if (setEndOffsetTaskIds.isEmpty()) {
                    log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{this.val$taskGroup.groupId});
                    return Futures.immediateFuture(null);
                }
                if (endOffsets.equals(this.val$taskGroup.checkpointSequences.lastEntry().getValue())) {
                    log.warn("Checkpoint[%s] is same as the start sequences[%s] of latest sequence for the taskGroup[%d].", new Object[]{endOffsets, this.val$taskGroup.checkpointSequences.lastEntry().getValue(), this.val$taskGroup.groupId});
                }
                log.info("Setting endOffsets for tasks in taskGroup[%d] to [%s]", new Object[]{this.val$taskGroup.groupId, endOffsets});
                for (String taskId : setEndOffsetTaskIds) {
                    setEndOffsetFutures.add(SeekableStreamSupervisor.this.taskClient.setEndOffsetsAsync(taskId, endOffsets, this.val$finalize));
                }
                return Futures.transform((ListenableFuture)FutureUtils.coalesce(setEndOffsetFutures), arg_0 -> this.lambda$apply$0((List)setEndOffsetTaskIds, this.val$taskGroup, endOffsets, arg_0), (Executor)SeekableStreamSupervisor.this.workerExec);
            }

            private /* synthetic */ Map lambda$apply$0(List setEndOffsetTaskIds, TaskGroup taskGroup, Map endOffsets, List results) {
                try {
                    for (int i = 0; i < results.size(); ++i) {
                        if (((Either)results.get(i)).isValue() && Boolean.valueOf(true).equals(((Either)results.get(i)).valueOrThrow())) {
                            log.info("Successfully set endOffsets for task[%s] and resumed it", new Object[]{setEndOffsetTaskIds.get(i)});
                            continue;
                        }
                        String taskId = (String)setEndOffsetTaskIds.get(i);
                        SeekableStreamSupervisor.this.killTask(taskId, "Failed to set end offsets, killing task", new Object[0]);
                        taskGroup.tasks.remove(taskId);
                    }
                }
                catch (Exception e) {
                    log.error("An exception occurred while setting end offsets: [%s]", new Object[]{e.getMessage()});
                    throw new RuntimeException(e);
                }
                if (taskGroup.tasks.isEmpty()) {
                    log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", new Object[]{taskGroup.groupId});
                    return null;
                }
                return endOffsets;
            }
        }, (Executor)this.workerExec);
    }

    private ListenableFuture<Void> stopTasksInGroup(@Nullable TaskGroup taskGroup, String stopReasonFormat, Object ... args) {
        if (taskGroup == null) {
            return Futures.immediateFuture(null);
        }
        log.info("Stopping all tasks in taskGroup[%s] because [%s].", new Object[]{taskGroup.groupId, StringUtils.format((String)stopReasonFormat, (Object[])args)});
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
            String taskId = entry.getKey();
            TaskData taskData = entry.getValue();
            if (taskData.status == null) {
                this.killTask(taskId, "Killing task since task status is not known to supervisor", new Object[0]);
                continue;
            }
            if (taskData.status.isComplete()) continue;
            futures.add(this.stopTask(taskId, false));
        }
        return FutureUtils.transform((ListenableFuture)FutureUtils.coalesce(futures), xs -> null);
    }

    private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException {
        ArrayList futures = new ArrayList();
        for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : this.pendingCompletionTaskGroups.entrySet()) {
            boolean stopTasksInTaskGroup = false;
            Integer groupId = pendingGroupList.getKey();
            CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingGroupList.getValue();
            ArrayList<TaskGroup> toRemove = new ArrayList<TaskGroup>();
            for (TaskGroup group : taskGroupList) {
                boolean foundSuccess = false;
                boolean entireTaskGroupFailed = false;
                if (stopTasksInTaskGroup) {
                    futures.add(this.stopTasksInGroup(group, "one of earlier groups that was handling the same partition set timed out before publishing segments", new Object[0]));
                    toRemove.add(group);
                    continue;
                }
                Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
                while (iTask.hasNext()) {
                    Map.Entry<String, TaskData> entry = iTask.next();
                    String taskId = entry.getKey();
                    TaskData taskData = entry.getValue();
                    Preconditions.checkNotNull((Object)taskData.status, (String)"Task[%s] has null status", (Object)taskId);
                    if (taskData.status.isFailure()) {
                        this.stateManager.recordCompletedTaskState(TaskState.FAILED);
                        iTask.remove();
                        if (group.tasks.isEmpty()) {
                            entireTaskGroupFailed = true;
                            break;
                        }
                    }
                    if (!taskData.status.isSuccess()) continue;
                    log.info("Task[%s] completed successfully, stopping tasks[%s]", new Object[]{taskId, group.taskIds()});
                    this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                    futures.add(this.stopTasksInGroup(group, "Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()));
                    foundSuccess = true;
                    toRemove.add(group);
                    break;
                }
                if ((foundSuccess || !group.completionTimeout.isBeforeNow()) && !entireTaskGroupFailed) continue;
                if (entireTaskGroupFailed) {
                    log.warn("All tasks in taskGroup[%d] failed to publish, killing all tasks for these partitions", new Object[]{groupId});
                } else {
                    log.makeAlert("No task in [%s] for taskGroup[%d] succeeded before the completion timeout elapsed [%s]! Check metrics and logs to see if the creation, publish or handoff of any segment is taking longer than usual.", new Object[]{group.taskIds(), groupId, this.ioConfig.getCompletionTimeout()}).emit();
                }
                for (Object partitionId : group.startingSequences.keySet()) {
                    this.partitionOffsets.put(partitionId, this.getNotSetMarker());
                }
                this.killTasksInGroup(group, "No task in pending completion taskGroup[%d] succeeded before completion timeout elapsed", groupId);
                stopTasksInTaskGroup = true;
                this.killTasksInGroup(this.activelyReadingTaskGroups.remove(groupId), "No task in the corresponding pending completion taskGroup[%d] succeeded before completion timeout elapsed", groupId);
                toRemove.add(group);
            }
            taskGroupList.removeAll(toRemove);
        }
        this.coalesceAndAwait(futures);
    }

    private void checkCurrentTaskState() throws ExecutionException, InterruptedException {
        Map<String, Task> activeTaskMap = this.getActiveTaskMap();
        ArrayList futures = new ArrayList();
        Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = this.activelyReadingTaskGroups.entrySet().iterator();
        while (iTaskGroups.hasNext()) {
            Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
            Integer groupId = taskGroupEntry.getKey();
            TaskGroup taskGroup = taskGroupEntry.getValue();
            log.debug("taskGroup[%d] pre-pruning: %s.", new Object[]{groupId, taskGroup.taskIds()});
            Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
            while (iTasks.hasNext()) {
                Map.Entry<String, TaskData> task = iTasks.next();
                String taskId = task.getKey();
                TaskData taskData = task.getValue();
                if (!this.isTaskCurrent(groupId, taskId, activeTaskMap)) {
                    log.info("Stopping task[%s] as it does not match the expected sequence range and ingestion spec.", new Object[]{taskId});
                    futures.add(this.stopTask(taskId, false));
                    iTasks.remove();
                    continue;
                }
                Preconditions.checkNotNull((Object)taskData.status, (String)"Task[%s] has null status", (Object)taskId);
                if (taskData.status.isFailure()) {
                    this.stateManager.recordCompletedTaskState(TaskState.FAILED);
                    iTasks.remove();
                    continue;
                }
                if (!taskData.status.isSuccess()) continue;
                this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                futures.add(this.stopTasksInGroup(taskGroup, "task[%s] succeeded in the same taskGroup", taskData.status.getId()));
                iTaskGroups.remove();
                break;
            }
            log.debug("After pruning, taskGroup[%d] has tasks[%s].", new Object[]{groupId, taskGroup.taskIds()});
        }
        this.coalesceAndAwait(futures);
    }

    private void checkIfStreamInactiveAndTurnSupervisorIdle() {
        boolean idle;
        long idleTime;
        Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream;
        if (!this.idleConfig.isEnabled() || this.spec.isSuspended()) {
            return;
        }
        long nowTime = DateTimes.nowUtc().getMillis();
        if (!this.stateManager.isAtLeastOneSuccessfulRun()) {
            this.previousSequencesFromStream.clear();
            this.previousSequencesFromStream.putAll(this.getOffsetsFromMetadataStorage());
            this.updatePartitionLagFromStream();
        }
        if (this.previousSequencesFromStream.equals(latestSequencesFromStream = this.getLatestSequencesFromStream()) && this.computeTotalLag() == 0L) {
            idleTime = nowTime - this.lastActiveTimeMillis;
            idle = true;
        } else {
            idleTime = 0L;
            this.lastActiveTimeMillis = nowTime;
            idle = false;
        }
        this.previousSequencesFromStream.clear();
        this.previousSequencesFromStream.putAll(latestSequencesFromStream);
        if (!idle) {
            this.stateManager.maybeSetState((SupervisorStateManager.State)SupervisorStateManager.BasicState.RUNNING);
        } else if (!this.stateManager.isIdle() && idleTime > this.idleConfig.getInactiveAfterMillis()) {
            log.info("Setting supervisor[%s] to state[%s]", new Object[]{this.supervisorId, SupervisorStateManager.BasicState.IDLE.name()});
            this.stateManager.maybeSetState((SupervisorStateManager.State)SupervisorStateManager.BasicState.IDLE);
            if (this.stateManager.isIdle()) {
                log.info("Successfully set supervisor[%s] to state[%s]", new Object[]{this.supervisorId, SupervisorStateManager.BasicState.IDLE.name()});
            }
        }
    }

    private long computeTotalLag() {
        LagStats lagStats = this.computeLagStats();
        return lagStats != null ? lagStats.getTotalLag() : 0L;
    }

    protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> startingOffsets) {
        return startingOffsets;
    }

    private void createNewTasks() throws JsonProcessingException {
        this.verifyAndMergeCheckpoints(this.activelyReadingTaskGroups.values().stream().filter(taskGroup -> taskGroup.tasks.size() < this.ioConfig.getReplicas()).collect(Collectors.toList()));
        for (Integer groupId : this.partitionGroups.keySet()) {
            if (this.activelyReadingTaskGroups.containsKey(groupId)) continue;
            log.info("Creating new taskGroup[%d] for partitions[%s].", new Object[]{groupId, this.partitionGroups.get(groupId)});
            DateTime minimumMessageTime = this.ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? (DateTime)this.ioConfig.getLateMessageRejectionStartDateTime().get() : (this.ioConfig.getLateMessageRejectionPeriod().isPresent() ? DateTimes.nowUtc().minus((ReadableDuration)this.ioConfig.getLateMessageRejectionPeriod().get()) : null);
            DateTime maximumMessageTime = this.ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? DateTimes.nowUtc().plus((ReadableDuration)this.ioConfig.getTaskDuration()).plus((ReadableDuration)this.ioConfig.getEarlyMessageRejectionPeriod().get()) : null;
            Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> unfilteredStartingOffsets = this.generateStartingSequencesForPartitionGroup(groupId);
            Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> startingOffsets = this.supportsPartitionExpiration() ? this.filterExpiredPartitionsFromStartingOffsets(unfilteredStartingOffsets) : unfilteredStartingOffsets;
            ImmutableMap simpleStartingOffsets = startingOffsets.entrySet().stream().filter(entry -> ((OrderedSequenceNumber)entry.getValue()).get() != null).collect(Collectors.collectingAndThen(Collectors.toMap(Map.Entry::getKey, entry -> ((OrderedSequenceNumber)entry.getValue()).get()), ImmutableMap::copyOf));
            ImmutableMap simpleUnfilteredStartingOffsets = this.supportsPartitionExpiration() ? unfilteredStartingOffsets.entrySet().stream().filter(entry -> ((OrderedSequenceNumber)entry.getValue()).get() != null).collect(Collectors.collectingAndThen(Collectors.toMap(Map.Entry::getKey, entry -> ((OrderedSequenceNumber)entry.getValue()).get()), ImmutableMap::copyOf)) : simpleStartingOffsets;
            Set exclusiveStartSequenceNumberPartitions = !this.useExclusiveStartingSequence ? Collections.emptySet() : startingOffsets.entrySet().stream().filter(x -> ((OrderedSequenceNumber)x.getValue()).get() != null && ((OrderedSequenceNumber)x.getValue()).isExclusive()).map(Map.Entry::getKey).collect(Collectors.toSet());
            this.activelyReadingTaskGroups.put(groupId, new TaskGroup(groupId, simpleStartingOffsets, simpleUnfilteredStartingOffsets, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions));
        }
        boolean createdTask = false;
        for (Map.Entry<Integer, TaskGroup> entry2 : this.activelyReadingTaskGroups.entrySet()) {
            TaskGroup taskGroup2 = entry2.getValue();
            Integer groupId = entry2.getKey();
            if (taskGroup2.startingSequences == null || taskGroup2.startingSequences.isEmpty() || taskGroup2.startingSequences.values().stream().allMatch(x -> x == null || this.isEndOfShard(x))) {
                log.debug("Nothing to read in any partition for taskGroup[%d], skipping task creation.", new Object[]{groupId});
                continue;
            }
            if (this.ioConfig.getReplicas() <= taskGroup2.tasks.size()) continue;
            log.info("Number of tasks[%d] does not match configured numReplicas[%d] in taskGroup[%d], creating more tasks.", new Object[]{taskGroup2.tasks.size(), this.ioConfig.getReplicas(), groupId});
            this.createTasksForGroup(groupId, this.ioConfig.getReplicas() - taskGroup2.tasks.size());
            createdTask = true;
        }
        if (createdTask && this.firstRunTime.isBeforeNow()) {
            this.scheduledExec.schedule(this.buildRunTask(), 5000L, TimeUnit.MILLISECONDS);
        }
    }

    private void addNotice(Notice notice) {
        log.debug("Adding notice [%s] to notices queue", new Object[]{notice.getClass().getName()});
        this.notices.add(notice);
    }

    @VisibleForTesting
    public void moveTaskGroupToPendingCompletion(int taskGroupId) {
        TaskGroup taskGroup = this.activelyReadingTaskGroups.remove(taskGroupId);
        if (taskGroup != null) {
            this.pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList()).add(taskGroup);
        }
    }

    @VisibleForTesting
    public int getNoticesQueueSize() {
        return this.notices.size();
    }

    private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(int groupId) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Map<PartitionIdType, SequenceOffsetType> metadataOffsets = this.getOffsetsFromMetadataStorage();
        for (PartitionIdType partitionId : this.partitionGroups.get(groupId)) {
            SequenceOffsetType sequence = this.partitionOffsets.get(partitionId);
            if (!this.getNotSetMarker().equals(sequence)) {
                if (this.isEndOfShard(sequence)) continue;
                builder.put(partitionId, this.makeSequenceNumber(sequence, this.useExclusiveStartSequenceNumberForNonFirstSequence()));
                continue;
            }
            OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = this.getOffsetFromStorageForPartition(partitionId, metadataOffsets);
            if (offsetFromStorage == null) continue;
            builder.put(partitionId, offsetFromStorage);
        }
        return builder.build();
    }

    private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition(PartitionIdType partition, Map<PartitionIdType, SequenceOffsetType> metadataOffsets) {
        SequenceOffsetType sequence = metadataOffsets.get(partition);
        if (sequence != null) {
            log.debug("Getting sequence [%s] from metadata storage for partition [%s]", new Object[]{sequence, partition});
            if (!this.taskTuningConfig.isSkipSequenceNumberAvailabilityCheck() && !this.checkOffsetAvailability(partition, sequence)) {
                if (this.taskTuningConfig.isResetOffsetAutomatically()) {
                    this.resetInternal(this.createDataSourceMetaDataForReset(this.ioConfig.getStream(), (Map<PartitionIdType, SequenceOffsetType>)ImmutableMap.of(partition, sequence)));
                    throw new StreamException((Throwable)new ISE("Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting sequence", new Object[]{sequence, partition}));
                }
                throw new StreamException((Throwable)new ISE("Previous sequenceNumber [%s] is no longer available for partition [%s]. You can clear the previous sequenceNumber and start reading from a valid message by using the supervisor's reset API.", new Object[]{sequence, partition}));
            }
            return this.makeSequenceNumber(sequence, this.useExclusiveStartSequenceNumberForNonFirstSequence());
        }
        boolean useEarliestSequenceNumber = this.ioConfig.isUseEarliestSequenceNumber();
        if (this.subsequentlyDiscoveredPartitions.contains(partition)) {
            log.info("Overriding useEarliestSequenceNumber and starting from beginning of newly discovered partition [%s] (which is probably from a split or merge)", new Object[]{partition});
            useEarliestSequenceNumber = true;
        }
        if ((sequence = this.getOffsetFromStreamForPartition(partition, useEarliestSequenceNumber)) == null) {
            throw new ISE("unable to fetch sequence number for partition[%s] from stream", new Object[]{partition});
        }
        log.debug("Getting sequence number [%s] for partition [%s]", new Object[]{sequence, partition});
        return this.makeSequenceNumber(sequence, false);
    }

    protected Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage() {
        SeekableStreamSequenceNumbers partitions;
        DataSourceMetadata dataSourceMetadata = this.retrieveDataSourceMetadata();
        if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata && this.checkSourceMetadataMatch(dataSourceMetadata) && (partitions = ((SeekableStreamDataSourceMetadata)dataSourceMetadata).getSeekableStreamSequenceNumbers()) != null) {
            if (!this.ioConfig.getStream().equals(partitions.getStream())) {
                log.warn("Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences", new Object[]{partitions.getStream(), this.ioConfig.getStream()});
                return Collections.emptyMap();
            }
            if (partitions.getPartitionSequenceNumberMap() != null) {
                return partitions.getPartitionSequenceNumberMap();
            }
        }
        return Collections.emptyMap();
    }

    protected DataSourceMetadata retrieveDataSourceMetadata() {
        return this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(this.supervisorId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partition, boolean useEarliestOffset) {
        this.recordSupplierLock.lock();
        try {
            StreamPartition<PartitionIdType> topicPartition = new StreamPartition<PartitionIdType>(this.ioConfig.getStream(), partition);
            if (!this.recordSupplier.getAssignment().contains(topicPartition)) {
                throw new IllegalStateException("Record supplier does not match current known partitions");
            }
            SequenceOffsetType SequenceOffsetType = useEarliestOffset ? this.recordSupplier.getEarliestSequenceNumber(topicPartition) : this.recordSupplier.getLatestSequenceNumber(topicPartition);
            return SequenceOffsetType;
        }
        finally {
            this.recordSupplierLock.unlock();
        }
    }

    private void createTasksForGroup(int groupId, int replicas) throws JsonProcessingException {
        TaskGroup group = this.activelyReadingTaskGroups.get(groupId);
        ImmutableMap startPartitions = group.startingSequences;
        HashMap endPartitions = new HashMap();
        for (Object partition : startPartitions.keySet()) {
            endPartitions.put(partition, this.getEndOfPartitionMarker());
        }
        Set exclusiveStartSequenceNumberPartitions = this.activelyReadingTaskGroups.get((Object)Integer.valueOf((int)groupId)).exclusiveStartSequenceNumberPartitions;
        DateTime minimumMessageTime = group.minimumMessageTime;
        DateTime maximumMessageTime = group.maximumMessageTime;
        SeekableStreamIndexTaskIOConfig newIoConfig = this.createTaskIoConfig(groupId, (Map<PartitionIdType, SequenceOffsetType>)startPartitions, endPartitions, group.baseSequenceName, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions, this.ioConfig);
        List taskList = this.createIndexTasks(replicas, group.baseSequenceName, this.sortingMapper, group.checkpointSequences, newIoConfig, this.taskTuningConfig, this.rowIngestionMetersFactory);
        for (SeekableStreamIndexTask indexTask : taskList) {
            Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
            if (taskQueue.isPresent()) {
                try {
                    ((TaskQueue)taskQueue.get()).add(indexTask);
                }
                catch (DruidException e) {
                    this.stateManager.recordThrowableEvent(e);
                    log.noStackTrace().error((Throwable)e, "Tried to add task [%s] but encountered error", new Object[]{indexTask.getId()});
                }
                continue;
            }
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    @VisibleForTesting
    public void updateCurrentAndLatestOffsets() {
        if (this.stateManager.isIdle() || this.stateManager.isSteadyState() || !this.stateManager.isHealthy()) {
            Stopwatch runTime = Stopwatch.createStarted();
            try {
                this.updateCurrentOffsets();
                this.updatePartitionLagFromStream();
                this.sequenceLastUpdated = DateTimes.nowUtc();
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Exception while getting current/latest sequences", new Object[0]);
            }
            finally {
                this.emitUpdateOffsetsTime(runTime.millisElapsed());
            }
        }
    }

    private void updateCurrentOffsets() throws InterruptedException, ExecutionException {
        List futures = Stream.concat(this.activelyReadingTaskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()), this.pendingCompletionTaskGroups.values().stream().flatMap(Collection::stream).flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())).map(task -> Futures.transform(this.taskClient.getCurrentOffsetsAsync((String)task.getKey(), false), currentSequences -> {
            if (currentSequences != null && !currentSequences.isEmpty()) {
                ((TaskData)task.getValue()).currentSequences = currentSequences;
            }
            return null;
        }, (Executor)MoreExecutors.directExecutor())).collect(Collectors.toList());
        this.coalesceAndAwait(futures);
    }

    protected abstract void updatePartitionLagFromStream();

    @Nullable
    protected abstract Map<PartitionIdType, Long> getPartitionRecordLag();

    @Nullable
    protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();

    protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets() {
        Map offsetsFromMetadataStorage = this.getOffsetsFromMetadataStorage();
        if (!(this.spec.isSuspended() || this.activelyReadingTaskGroups.isEmpty() && this.pendingCompletionTaskGroups.isEmpty())) {
            Map<Object, Object> currentOffsets = Stream.concat(this.activelyReadingTaskGroups.values().stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()).flatMap(taskData -> ((TaskData)taskData.getValue()).currentSequences.entrySet().stream()), this.pendingCompletionTaskGroups.values().stream().flatMap(taskGroups -> taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())).flatMap(taskData -> ((TaskData)taskData.getValue()).currentSequences.entrySet().stream())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> this.makeSequenceNumber(v1).compareTo(this.makeSequenceNumber(v2)) > 0 ? v1 : v2));
            this.partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
            return currentOffsets;
        }
        return offsetsFromMetadataStorage;
    }

    private OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType seq) {
        return this.makeSequenceNumber(seq, false);
    }

    @VisibleForTesting
    public boolean isStarted() {
        return this.started;
    }

    @VisibleForTesting
    public boolean isLifecycleStarted() {
        return this.lifecycleStarted;
    }

    @VisibleForTesting
    public int getInitRetryCounter() {
        return this.initRetryCounter;
    }

    @VisibleForTesting
    public SeekableStreamSupervisorIOConfig getIoConfig() {
        return this.ioConfig;
    }

    public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) {
        Preconditions.checkNotNull((Object)checkpointMetadata, (Object)"checkpointMetadata");
        SeekableStreamDataSourceMetadata seekableMetadata = (SeekableStreamDataSourceMetadata)checkpointMetadata;
        Preconditions.checkArgument((boolean)this.spec.getIoConfig().getStream().equals(seekableMetadata.getSeekableStreamSequenceNumbers().getStream()), (String)"Supervisor stream [%s] and stream in checkpoint [%s] does not match", (Object)this.spec.getIoConfig().getStream(), (Object)seekableMetadata.getSeekableStreamSequenceNumbers().getStream());
        Preconditions.checkArgument((boolean)(seekableMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers), (Object)"checkpointMetadata must be SeekableStreamStartSequenceNumbers");
        log.info("Checkpointing [%s] for taskGroup [%s]", new Object[]{checkpointMetadata, taskGroupId});
        this.addNotice(new CheckpointNotice(taskGroupId, seekableMetadata));
    }

    @VisibleForTesting
    public Map<String, Object> createBaseTaskContexts() {
        HashMap<String, Object> contexts = new HashMap<String, Object>();
        if (this.spec.getContext() != null) {
            contexts.putAll(this.spec.getContext());
        }
        return contexts;
    }

    @VisibleForTesting
    public ConcurrentHashMap<Integer, Set<PartitionIdType>> getPartitionGroups() {
        return this.partitionGroups;
    }

    @VisibleForTesting
    public boolean isPartitionIdsEmpty() {
        return this.partitionIds.isEmpty();
    }

    public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> getPartitionOffsets() {
        return this.partitionOffsets;
    }

    @VisibleForTesting
    public void setPartitionIdsForTests(List<PartitionIdType> partitionIdsForTests) {
        this.partitionIds.clear();
        this.partitionIds.addAll(partitionIdsForTests);
    }

    private Map<String, Task> getActiveTaskMap() {
        Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            return ((TaskQueue)taskQueue.get()).getActiveTasksForDatasource(this.dataSource);
        }
        return CollectionUtils.toMap(this.taskStorage.getActiveTasksByDatasource(this.dataSource), Task::getId, task -> task);
    }

    protected abstract SeekableStreamIndexTaskIOConfig createTaskIoConfig(int var1, Map<PartitionIdType, SequenceOffsetType> var2, Map<PartitionIdType, SequenceOffsetType> var3, String var4, DateTime var5, DateTime var6, Set<PartitionIdType> var7, SeekableStreamSupervisorIOConfig var8);

    protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>> createIndexTasks(int var1, String var2, ObjectMapper var3, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> var4, SeekableStreamIndexTaskIOConfig var5, SeekableStreamIndexTaskTuningConfig var6, RowIngestionMetersFactory var7) throws JsonProcessingException;

    protected abstract int getTaskGroupIdForPartition(PartitionIdType var1);

    protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata var1);

    protected abstract boolean doesTaskMatchSupervisor(Task var1);

    protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(String var1, Map<PartitionIdType, SequenceOffsetType> var2);

    protected abstract OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType var1, boolean var2);

    protected void scheduleReporting(ScheduledExecutorService reportingExec) {
        SeekableStreamSupervisorIOConfig ioConfig = this.spec.getIoConfig();
        SeekableStreamSupervisorTuningConfig tuningConfig = this.spec.getTuningConfig();
        reportingExec.scheduleWithFixedDelay(this::updateCurrentAndLatestOffsets, ioConfig.getStartDelay().getMillis() + 15000L, Math.max(tuningConfig.getOffsetFetchPeriod().getMillis(), 5000L), TimeUnit.MILLISECONDS);
        reportingExec.scheduleAtFixedRate(this::emitLag, ioConfig.getStartDelay().getMillis() + 25000L, this.spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS);
        reportingExec.scheduleAtFixedRate(this::emitNoticesQueueSize, ioConfig.getStartDelay().getMillis() + 25000L, this.spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS);
    }

    protected abstract Map<PartitionIdType, Long> getRecordLagPerPartition(Map<PartitionIdType, SequenceOffsetType> var1);

    protected abstract Map<PartitionIdType, Long> getTimeLagPerPartition(Map<PartitionIdType, SequenceOffsetType> var1);

    protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> setupRecordSupplier();

    protected abstract SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType> createReportPayload(int var1, boolean var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkOffsetAvailability(@NotNull PartitionIdType partition, @NotNull SequenceOffsetType offsetFromMetadata) {
        StreamPartition<PartitionIdType> streamPartition = StreamPartition.of(this.ioConfig.getStream(), partition);
        OrderedSequenceNumber<SequenceOffsetType> sequenceNumber = this.makeSequenceNumber(offsetFromMetadata);
        this.recordSupplierLock.lock();
        if (!this.recordSupplier.getAssignment().contains(streamPartition)) {
            throw new IllegalStateException("Record supplier does not match current known partitions");
        }
        try {
            boolean bl = this.recordSupplier.isOffsetAvailable(streamPartition, sequenceNumber);
            return bl;
        }
        finally {
            this.recordSupplierLock.unlock();
        }
    }

    private <T> List<Either<Throwable, T>> coalesceAndAwait(List<ListenableFuture<T>> futures) throws ExecutionException, InterruptedException {
        return (List)FutureUtils.get((ListenableFuture)FutureUtils.coalesce(futures), (boolean)true);
    }

    protected void emitNoticeProcessTime(String noticeType, long timeInMillis) {
        try {
            this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("noticeType", (Object)noticeType).setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimensionIfNotNull("tags", this.spec.getContextValue("tags")).setMetric("ingest/notices/time", (Number)timeInMillis));
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to emit notices process time", new Object[0]);
        }
    }

    private void emitUpdateOffsetsTime(long timeInMillis) {
        try {
            this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimensionIfNotNull("tags", this.spec.getContextValue("tags")).setMetric(StringUtils.format((String)"ingest/%s/fetchOffsets/time", (Object[])new Object[]{this.spec.getType()}), (Number)timeInMillis));
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to emit updateOffsets time", new Object[0]);
        }
    }

    protected void emitNoticesQueueSize() {
        if (this.spec.isSuspended()) {
            return;
        }
        try {
            this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimensionIfNotNull("tags", this.spec.getContextValue("tags")).setMetric("ingest/notices/queueSize", (Number)this.getNoticesQueueSize()));
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to emit notices queue size", new Object[0]);
        }
    }

    protected void emitLag() {
        boolean unhealthySupervisorOrTasks;
        SupervisorStateManager.State basicState = this.stateManager.getSupervisorState().getBasicState();
        boolean bl = unhealthySupervisorOrTasks = SupervisorStateManager.BasicState.UNHEALTHY_TASKS.equals((Object)basicState) || SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR.equals((Object)basicState);
        if (this.spec.isSuspended() || !this.stateManager.isSteadyState() && !this.stateManager.isIdle() && !unhealthySupervisorOrTasks) {
            return;
        }
        try {
            Map<PartitionIdType, Long> partitionRecordLags = this.getPartitionRecordLag();
            Map<PartitionIdType, Long> partitionTimeLags = this.getPartitionTimeLag();
            if (partitionRecordLags == null && partitionTimeLags == null) {
                throw new ISE("Latest offsets have not been fetched", new Object[0]);
            }
            String type = this.spec.getType();
            BiConsumer<Map, String> emitFn = (partitionLags, suffix) -> {
                long staleMillis;
                if (partitionLags == null) {
                    return;
                }
                long l = staleMillis = this.sequenceLastUpdated == null ? 0L : DateTimes.nowUtc().getMillis() - (this.tuningConfig.getOffsetFetchPeriod().getMillis() + this.sequenceLastUpdated.getMillis());
                if (staleMillis > 0L && partitionLags.values().stream().anyMatch(x -> x < 0L)) {
                    if (staleMillis / this.getIoConfig().getPeriod().getMillis() % 20L == 0L) {
                        log.warn("Lag is negative and will not be emitted because topic offsets have become stale. This will not impact data processing. Offsets may become stale because of connectivity issues.", new Object[0]);
                    }
                    return;
                }
                LagStats lagStats = this.aggregatePartitionLags((Map<PartitionIdType, Long>)partitionLags);
                Map metricTags = (Map)this.spec.getContextValue("tags");
                for (Map.Entry entry : partitionLags.entrySet()) {
                    this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimension("stream", (Object)this.getIoConfig().getStream()).setDimension("partition", entry.getKey()).setDimensionIfNotNull("tags", (Object)metricTags).setMetric(StringUtils.format((String)"ingest/%s/partitionLag%s", (Object[])new Object[]{type, suffix}), (Number)entry.getValue()));
                }
                this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimension("stream", (Object)this.getIoConfig().getStream()).setDimensionIfNotNull("tags", (Object)metricTags).setMetric(StringUtils.format((String)"ingest/%s/lag%s", (Object[])new Object[]{type, suffix}), (Number)lagStats.getTotalLag()));
                this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimension("stream", (Object)this.getIoConfig().getStream()).setDimensionIfNotNull("tags", (Object)metricTags).setMetric(StringUtils.format((String)"ingest/%s/maxLag%s", (Object[])new Object[]{type, suffix}), (Number)lagStats.getMaxLag()));
                this.emitter.emit((ServiceEventBuilder)ServiceMetricEvent.builder().setDimension("supervisorId", (Object)this.supervisorId).setDimension("dataSource", (Object)this.dataSource).setDimension("stream", (Object)this.getIoConfig().getStream()).setDimensionIfNotNull("tags", (Object)metricTags).setMetric(StringUtils.format((String)"ingest/%s/avgLag%s", (Object[])new Object[]{type, suffix}), (Number)lagStats.getAvgLag()));
            };
            emitFn.accept(partitionRecordLags, "");
            emitFn.accept(partitionTimeLags, "/time");
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to compute lag", new Object[0]);
        }
    }

    protected LagStats aggregatePartitionLags(Map<PartitionIdType, Long> partitionLags) {
        return this.spec.getIoConfig().getLagAggregator().aggregate(partitionLags);
    }

    protected abstract SequenceOffsetType getNotSetMarker();

    protected abstract SequenceOffsetType getEndOfPartitionMarker();

    protected abstract boolean isEndOfShard(SequenceOffsetType var1);

    protected abstract boolean isShardExpirationMarker(SequenceOffsetType var1);

    protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence();

    private class GracefulShutdownNotice
    extends ShutdownNotice {
        private static final String TYPE = "graceful_shutdown_notice";

        private GracefulShutdownNotice() {
        }

        @Override
        public void handle() throws InterruptedException, ExecutionException {
            SeekableStreamSupervisor.this.gracefulShutdownInternal();
            super.handle();
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    private static interface Notice {
        public String getType();

        public void handle() throws ExecutionException, InterruptedException;
    }

    private class ShutdownNotice
    implements Notice {
        private static final String TYPE = "shutdown_notice";

        private ShutdownNotice() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() throws InterruptedException, ExecutionException {
            SeekableStreamSupervisor.this.recordSupplier.close();
            Object object = SeekableStreamSupervisor.this.stopLock;
            synchronized (object) {
                SeekableStreamSupervisor.this.stopped = true;
                SeekableStreamSupervisor.this.stopLock.notifyAll();
            }
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    private class ResetNotice
    implements Notice {
        final DataSourceMetadata dataSourceMetadata;
        private static final String TYPE = "reset_notice";

        ResetNotice(DataSourceMetadata dataSourceMetadata) {
            this.dataSourceMetadata = dataSourceMetadata;
        }

        @Override
        public void handle() {
            SeekableStreamSupervisor.this.resetInternal(this.dataSourceMetadata);
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    private class ResetOffsetsNotice
    implements Notice {
        final DataSourceMetadata dataSourceMetadata;
        private static final String TYPE = "reset_offsets_notice";

        ResetOffsetsNotice(DataSourceMetadata dataSourceMetadata) {
            this.dataSourceMetadata = dataSourceMetadata;
        }

        @Override
        public void handle() {
            SeekableStreamSupervisor.this.resetOffsetsInternal(this.dataSourceMetadata);
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    @VisibleForTesting
    public class TaskGroup {
        final int groupId;
        final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
        final ImmutableMap<PartitionIdType, SequenceOffsetType> unfilteredStartingSequencesForSequenceName;
        final ConcurrentHashMap<String, TaskData> tasks = new ConcurrentHashMap();
        final DateTime minimumMessageTime;
        final DateTime maximumMessageTime;
        final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
        final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpointSequences = new TreeMap();
        final String baseSequenceName;
        DateTime completionTimeout;
        boolean handoffEarly = false;

        TaskGroup(int groupId, @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences, @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> unfilteredStartingSequencesForSequenceName, @Nullable DateTime minimumMessageTime, @Nullable DateTime maximumMessageTime, Set<PartitionIdType> exclusiveStartSequenceNumberPartitions) {
            this(groupId, startingSequences, unfilteredStartingSequencesForSequenceName, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions, this$0.generateSequenceName(unfilteredStartingSequencesForSequenceName == null ? startingSequences : unfilteredStartingSequencesForSequenceName, minimumMessageTime, maximumMessageTime, this$0.spec.getDataSchema(), this$0.taskTuningConfig));
        }

        TaskGroup(int groupId, @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences, ImmutableMap<PartitionIdType, SequenceOffsetType> unfilteredStartingSequencesForSequenceName, DateTime minimumMessageTime, DateTime maximumMessageTime, Set<PartitionIdType> exclusiveStartSequenceNumberPartitions, String baseSequenceName) {
            this.groupId = groupId;
            this.startingSequences = startingSequences;
            this.unfilteredStartingSequencesForSequenceName = unfilteredStartingSequencesForSequenceName == null ? startingSequences : unfilteredStartingSequencesForSequenceName;
            this.minimumMessageTime = minimumMessageTime;
            this.maximumMessageTime = maximumMessageTime;
            this.checkpointSequences.put(0, (Map)startingSequences);
            this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null ? exclusiveStartSequenceNumberPartitions : Collections.emptySet();
            this.baseSequenceName = baseSequenceName;
        }

        void addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint) {
            this.checkpointSequences.put(this.checkpointSequences.lastKey() + 1, checkpoint);
        }

        Set<String> taskIds() {
            return this.tasks.keySet();
        }

        void setHandoffEarly() {
            this.handoffEarly = true;
        }

        Boolean getHandoffEarly() {
            return this.handoffEarly;
        }

        @VisibleForTesting
        public String getBaseSequenceName() {
            return this.baseSequenceName;
        }

        public String toString() {
            return "TaskGroup{groupId=" + this.groupId + ", tasks=" + String.valueOf(this.tasks) + "}";
        }
    }

    private class TaskData {
        volatile TaskStatus status;
        volatile DateTime startTime;
        volatile Map<PartitionIdType, SequenceOffsetType> currentSequences = new HashMap();

        private TaskData() {
        }

        public String toString() {
            return "TaskData{status=" + String.valueOf(this.status) + ", startTime=" + String.valueOf(this.startTime) + ", checkpointSequences=" + String.valueOf(this.currentSequences) + "}";
        }
    }

    private static class StatsFromTaskResult {
        private final String groupId;
        private final String taskId;
        private final Map<String, Object> stats;

        public StatsFromTaskResult(int groupId, String taskId, Map<String, Object> stats) {
            this.groupId = String.valueOf(groupId);
            this.taskId = taskId;
            this.stats = stats;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public String getTaskId() {
            return this.taskId;
        }

        public Map<String, Object> getStats() {
            return this.stats;
        }
    }

    private static class ErrorsFromTaskResult {
        private final String groupId;
        private final String taskId;
        private final List<ParseExceptionReport> errors;

        public ErrorsFromTaskResult(int groupId, String taskId, List<ParseExceptionReport> errors) {
            this.groupId = String.valueOf(groupId);
            this.taskId = taskId;
            this.errors = errors;
        }

        public List<ParseExceptionReport> getErrors() {
            return this.errors;
        }
    }

    private class HandoffTaskGroupsNotice
    implements Notice {
        final List<Integer> taskGroupIds;
        private static final String TYPE = "handoff_task_group_notice";

        HandoffTaskGroupsNotice(List<Integer> taskGroupIds) {
            this.taskGroupIds = taskGroupIds;
        }

        @Override
        public void handle() {
            for (Integer taskGroupId : this.taskGroupIds) {
                TaskGroup taskGroup = SeekableStreamSupervisor.this.activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
                if (taskGroup == null) {
                    log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", new Object[]{taskGroupId, SeekableStreamSupervisor.this.supervisorId});
                    continue;
                }
                log.info("Task group [%d] for supervisor [%s] will handoff early.", new Object[]{taskGroupId, SeekableStreamSupervisor.this.supervisorId});
                taskGroup.setHandoffEarly();
            }
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    protected class CheckpointNotice
    implements Notice {
        private final int taskGroupId;
        private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
        private static final String TYPE = "checkpoint_notice";

        CheckpointNotice(int taskGroupId, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata) {
            this.taskGroupId = taskGroupId;
            this.checkpointMetadata = checkpointMetadata;
        }

        @Override
        public void handle() throws ExecutionException, InterruptedException {
            TaskGroup taskGroup = SeekableStreamSupervisor.this.activelyReadingTaskGroups.get(this.taskGroupId);
            if (this.isValidTaskGroup(this.taskGroupId, taskGroup)) {
                int sequenceId;
                Map checkpoint;
                TreeMap checkpoints = taskGroup.checkpointSequences;
                int index = checkpoints.size();
                Iterator<Integer> iterator = checkpoints.descendingKeySet().iterator();
                while (iterator.hasNext() && !(checkpoint = checkpoints.get(sequenceId = iterator.next().intValue())).equals(this.checkpointMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap())) {
                    --index;
                }
                if (index == 0) {
                    throw new ISE("No such previous checkpoint [%s] found", new Object[]{this.checkpointMetadata});
                }
                if (index < checkpoints.size()) {
                    Preconditions.checkState((index == checkpoints.size() - 1 ? 1 : 0) != 0, (Object)"checkpoint consistency failure");
                    log.info("Already checkpointed with sequences [%s]", new Object[]{checkpoints.lastEntry().getValue()});
                    return;
                }
                Map newCheckpoint = (Map)SeekableStreamSupervisor.this.checkpointTaskGroup(taskGroup, false).get();
                if (MapUtils.isNotEmpty((Map)newCheckpoint)) {
                    taskGroup.addNewCheckpoint(newCheckpoint);
                    log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", new Object[]{newCheckpoint, this.taskGroupId});
                } else {
                    log.warn("New checkpoint is null for taskGroup [%s]", new Object[]{this.taskGroupId});
                }
            }
        }

        boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) {
            if (taskGroup == null) {
                if (SeekableStreamSupervisor.this.pendingCompletionTaskGroups.containsKey(taskGroupId)) {
                    log.warn("Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for publishing segments", new Object[]{taskGroupId});
                    return false;
                }
                if (SeekableStreamSupervisor.this.partitionGroups.containsKey(taskGroupId)) {
                    log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", new Object[]{taskGroupId});
                    return false;
                }
                throw new ISE("Cannot find taskGroup [%s] among all activelyReadingTaskGroups [%s]", new Object[]{taskGroupId, SeekableStreamSupervisor.this.activelyReadingTaskGroups});
            }
            return true;
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }

    private final class RunNotice
    implements Notice {
        private static final String TYPE = "run_notice";

        private RunNotice() {
        }

        @Override
        public void handle() {
            long nowTime = System.currentTimeMillis();
            if (nowTime - SeekableStreamSupervisor.this.lastRunTime < 1000L) {
                return;
            }
            SeekableStreamSupervisor.this.lastRunTime = nowTime;
            SeekableStreamSupervisor.this.runInternal();
        }

        @Override
        public String getType() {
            return TYPE;
        }

        public int hashCode() {
            return 0;
        }

        public boolean equals(Object obj) {
            return obj != null && obj.getClass().equals(RunNotice.class);
        }
    }

    private class DynamicAllocationTasksNotice
    implements Notice {
        Callable<Integer> computeDesiredTaskCount;
        ServiceEmitter emitter;
        Runnable onSuccessfulScale;
        private static final String TYPE = "dynamic_allocation_tasks_notice";

        DynamicAllocationTasksNotice(Callable<Integer> computeDesiredTaskCount, Runnable onSuccessfulScale, ServiceEmitter emitter) {
            this.computeDesiredTaskCount = computeDesiredTaskCount;
            this.onSuccessfulScale = onSuccessfulScale;
            this.emitter = emitter;
        }

        @Override
        public void handle() {
            if (SeekableStreamSupervisor.this.autoScalerConfig == null) {
                log.warn("autoScalerConfig is null but dynamic allocation notice is submitted, how can it be ?", new Object[0]);
            } else {
                try {
                    boolean allocationSuccess;
                    long nowTime = System.currentTimeMillis();
                    if (SeekableStreamSupervisor.this.spec.isSuspended()) {
                        log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended", new Object[]{SeekableStreamSupervisor.this.supervisorId});
                        return;
                    }
                    if (SupervisorStateManager.BasicState.IDLE == SeekableStreamSupervisor.this.getState()) {
                        log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is idle", new Object[]{SeekableStreamSupervisor.this.supervisorId});
                        return;
                    }
                    log.debug("PendingCompletionTaskGroups is [%s] for supervisor[%s] for dataSource[%s]", new Object[]{SeekableStreamSupervisor.this.pendingCompletionTaskGroups, SeekableStreamSupervisor.this.supervisorId, SeekableStreamSupervisor.this.dataSource});
                    for (CopyOnWriteArrayList<TaskGroup> list : SeekableStreamSupervisor.this.pendingCompletionTaskGroups.values()) {
                        if (list.isEmpty()) continue;
                        log.info("Skipping DynamicAllocationTasksNotice execution for supervisor[%s] for datasource[%s] because following tasks are pending [%s]", new Object[]{SeekableStreamSupervisor.this.supervisorId, SeekableStreamSupervisor.this.dataSource, list});
                        return;
                    }
                    Integer desiredTaskCount = this.computeDesiredTaskCount.call();
                    ServiceMetricEvent.Builder event = ServiceMetricEvent.builder().setDimension("supervisorId", (Object)SeekableStreamSupervisor.this.supervisorId).setDimension("dataSource", (Object)SeekableStreamSupervisor.this.dataSource).setDimension("stream", (Object)SeekableStreamSupervisor.this.getIoConfig().getStream());
                    if (nowTime - SeekableStreamSupervisor.this.dynamicTriggerLastRunTime < SeekableStreamSupervisor.this.autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
                        log.info("DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%s], active task count is [%s]", new Object[]{nowTime - SeekableStreamSupervisor.this.dynamicTriggerLastRunTime, SeekableStreamSupervisor.this.autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), SeekableStreamSupervisor.this.supervisorId, SeekableStreamSupervisor.this.dataSource, desiredTaskCount, SeekableStreamSupervisor.this.getActiveTaskGroupsCount()});
                        if (desiredTaskCount > 0) {
                            this.emitter.emit((ServiceEventBuilder)event.setDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, (Object)"minTriggerScaleActionFrequencyMillis not elapsed yet").setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, (Number)desiredTaskCount));
                        }
                        return;
                    }
                    if (desiredTaskCount > 0) {
                        this.emitter.emit((ServiceEventBuilder)event.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, (Number)desiredTaskCount));
                    }
                    if (allocationSuccess = SeekableStreamSupervisor.this.changeTaskCount(desiredTaskCount)) {
                        this.onSuccessfulScale.run();
                        SeekableStreamSupervisor.this.dynamicTriggerLastRunTime = nowTime;
                    }
                }
                catch (Exception ex) {
                    log.warn((Throwable)ex, "Error parsing DynamicAllocationTasksNotice", new Object[0]);
                }
            }
        }

        @Override
        public String getType() {
            return TYPE;
        }
    }
}

