/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.kafka.KafkaDataSourceMetadata;
import io.druid.indexing.kafka.KafkaIOConfig;
import io.druid.indexing.kafka.KafkaIndexTask;
import io.druid.indexing.kafka.KafkaIndexTaskClient;
import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.kafka.KafkaPartitions;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorReport;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import io.druid.indexing.kafka.supervisor.TaskReportData;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.metadata.EntryExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.DateTime;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class KafkaSupervisor
implements Supervisor {
    private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
    private static final Random RANDOM = new Random();
    private static final long MAX_RUN_FREQUENCY_MILLIS = 1000L;
    private static final long NOT_SET = -1L;
    private final ConcurrentHashMap<Integer, TaskGroup> taskGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap();
    private final TaskStorage taskStorage;
    private final TaskMaster taskMaster;
    private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private final KafkaIndexTaskClient taskClient;
    private final ObjectMapper sortingMapper;
    private final KafkaSupervisorSpec spec;
    private final String dataSource;
    private final KafkaSupervisorIOConfig ioConfig;
    private final KafkaSupervisorTuningConfig tuningConfig;
    private final KafkaTuningConfig taskTuningConfig;
    private final String supervisorId;
    private final TaskInfoProvider taskInfoProvider;
    private final ExecutorService exec;
    private final ScheduledExecutorService scheduledExec;
    private final ListeningExecutorService workerExec;
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<Notice>();
    private final Object stopLock = new Object();
    private final Object stateChangeLock = new Object();
    private boolean listenerRegistered = false;
    private long lastRunTime;
    private volatile DateTime firstRunTime;
    private volatile KafkaConsumer consumer;
    private volatile boolean started = false;
    private volatile boolean stopped = false;

    public KafkaSupervisor(final TaskStorage taskStorage, final TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory taskClientFactory, ObjectMapper mapper, KafkaSupervisorSpec spec) {
        this.taskStorage = taskStorage;
        this.taskMaster = taskMaster;
        this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
        this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
        this.spec = spec;
        this.dataSource = spec.getDataSchema().getDataSource();
        this.ioConfig = spec.getIoConfig();
        this.tuningConfig = spec.getTuningConfig();
        this.taskTuningConfig = KafkaTuningConfig.copyOf(this.tuningConfig);
        this.supervisorId = String.format("KafkaSupervisor-%s", this.dataSource);
        this.exec = Execs.singleThreaded((String)this.supervisorId);
        this.scheduledExec = Execs.scheduledSingleThreaded((String)(this.supervisorId + "-Scheduler-%d"));
        int workerThreads = this.tuningConfig.getWorkerThreads() != null ? this.tuningConfig.getWorkerThreads() : Math.min(10, this.ioConfig.getTaskCount());
        this.workerExec = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerThreads, (String)(this.supervisorId + "-Worker-%d")));
        log.info("Created worker pool with [%d] threads for dataSource [%s]", new Object[]{workerThreads, this.dataSource});
        this.taskInfoProvider = new TaskInfoProvider(){

            public TaskLocation getTaskLocation(final String id) {
                Preconditions.checkNotNull((Object)id, (Object)"id");
                Optional taskRunner = taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    Optional item = Iterables.tryFind((Iterable)((TaskRunner)taskRunner.get()).getRunningTasks(), (Predicate)new Predicate<TaskRunnerWorkItem>(){

                        public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) {
                            return id.equals(taskRunnerWorkItem.getTaskId());
                        }
                    });
                    if (item.isPresent()) {
                        return ((TaskRunnerWorkItem)item.get()).getLocation();
                    }
                } else {
                    log.error("Failed to get task runner because I'm not the leader!", new Object[0]);
                }
                return TaskLocation.unknown();
            }

            public Optional<TaskStatus> getTaskStatus(String id) {
                return taskStorage.getStatus(id);
            }
        };
        int chatThreads = this.tuningConfig.getChatThreads() != null ? this.tuningConfig.getChatThreads() : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas());
        this.taskClient = taskClientFactory.build(this.taskInfoProvider, this.dataSource, chatThreads, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries());
        log.info("Created taskClient with dataSource[%s] chatThreads[%d] httpTimeout[%s] chatRetries[%d]", new Object[]{this.dataSource, chatThreads, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries()});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.stateChangeLock;
        synchronized (object) {
            Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"already started");
            Preconditions.checkState((!this.exec.isShutdown() ? 1 : 0) != 0, (Object)"already stopped");
            try {
                this.consumer = this.getKafkaConsumer();
                this.exec.submit(new Runnable(){

                    @Override
                    public void run() {
                        block4: while (true) {
                            try {
                                while (!Thread.currentThread().isInterrupted()) {
                                    Notice notice = (Notice)KafkaSupervisor.this.notices.take();
                                    try {
                                        notice.handle();
                                        continue block4;
                                    }
                                    catch (Exception e) {
                                        log.makeAlert((Throwable)e, "KafkaSupervisor[%s] failed to handle notice", new Object[]{KafkaSupervisor.this.dataSource}).addData("noticeClass", (Object)notice.getClass().getSimpleName()).emit();
                                    }
                                }
                                break;
                            }
                            catch (InterruptedException e) {
                                log.info("KafkaSupervisor[%s] interrupted, exiting", new Object[]{KafkaSupervisor.this.dataSource});
                                break;
                            }
                        }
                    }
                });
                this.firstRunTime = DateTime.now().plus((ReadableDuration)this.ioConfig.getStartDelay());
                this.scheduledExec.scheduleAtFixedRate(this.buildRunTask(), this.ioConfig.getStartDelay().getMillis(), Math.max(this.ioConfig.getPeriod().getMillis(), 1000L), TimeUnit.MILLISECONDS);
                this.started = true;
                log.info("Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", new Object[]{this.dataSource, this.ioConfig.getStartDelay(), this.spec.toString()});
            }
            catch (Exception e) {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                log.makeAlert((Throwable)e, "Exception starting KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(boolean stopGracefully) {
        Object object = this.stateChangeLock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.started, (Object)"not started");
            log.info("Beginning shutdown of KafkaSupervisor[%s]", new Object[]{this.dataSource});
            try {
                this.scheduledExec.shutdownNow();
                Optional taskRunner = this.taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    ((TaskRunner)taskRunner.get()).unregisterListener(this.supervisorId);
                }
                Object object2 = this.stopLock;
                synchronized (object2) {
                    if (stopGracefully) {
                        log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish", new Object[0]);
                        this.notices.add(new GracefulShutdownNotice());
                    } else {
                        log.info("Posting ShutdownNotice", new Object[0]);
                        this.notices.add(new ShutdownNotice());
                    }
                    long shutdownTimeoutMillis = this.tuningConfig.getShutdownTimeout().getMillis();
                    long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
                    while (!this.stopped) {
                        long sleepTime = endTime - System.currentTimeMillis();
                        if (sleepTime <= 0L) {
                            log.info("Timed out while waiting for shutdown (timeout [%,dms])", new Object[]{shutdownTimeoutMillis});
                            this.stopped = true;
                            break;
                        }
                        this.stopLock.wait(sleepTime);
                    }
                }
                log.info("Shutdown notice handled", new Object[0]);
                this.taskClient.close();
                this.workerExec.shutdownNow();
                this.exec.shutdownNow();
                this.started = false;
                log.info("KafkaSupervisor[%s] has stopped", new Object[]{this.dataSource});
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Exception stopping KafkaSupervisor[%s]", new Object[]{this.dataSource}).emit();
            }
        }
    }

    public SupervisorReport getStatus() {
        return this.generateReport(true);
    }

    public void reset() {
        log.info("Posting ResetNotice", new Object[0]);
        this.notices.add(new ResetNotice());
    }

    public void possiblyRegisterListener() {
        if (this.listenerRegistered) {
            return;
        }
        Optional taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner)taskRunner.get()).registerListener(new TaskRunnerListener(){

                public String getListenerId() {
                    return KafkaSupervisor.this.supervisorId;
                }

                public void locationChanged(String taskId, TaskLocation newLocation) {
                }

                public void statusChanged(String taskId, TaskStatus status) {
                    KafkaSupervisor.this.notices.add(new RunNotice());
                }
            }, (Executor)MoreExecutors.sameThreadExecutor());
            this.listenerRegistered = true;
        }
    }

    @VisibleForTesting
    void resetInternal() {
        boolean result = this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(this.dataSource);
        log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", new Object[]{this.dataSource, result});
        for (TaskGroup taskGroup : this.taskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                String taskId = entry.getKey();
                log.info("Reset dataSource[%s] - killing task [%s]", new Object[]{this.dataSource, taskId});
                this.killTask(taskId);
            }
        }
        this.partitionGroups.clear();
        this.taskGroups.clear();
    }

    @VisibleForTesting
    void gracefulShutdownInternal() throws ExecutionException, InterruptedException {
        for (TaskGroup taskGroup : this.taskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                if (this.taskInfoProvider.getTaskLocation(entry.getKey()).equals((Object)TaskLocation.unknown())) {
                    this.killTask(entry.getKey());
                    continue;
                }
                entry.getValue().startTime = new DateTime(0L);
            }
        }
        this.checkTaskDuration();
    }

    @VisibleForTesting
    void runInternal() throws ExecutionException, InterruptedException {
        this.possiblyRegisterListener();
        this.updatePartitionDataFromKafka();
        this.discoverTasks();
        this.updateTaskStatus();
        this.checkTaskDuration();
        this.checkPendingCompletionTasks();
        this.checkCurrentTaskState();
        this.createNewTasks();
        if (log.isDebugEnabled()) {
            log.debug(this.generateReport(true).toString(), new Object[0]);
        } else {
            log.info(this.generateReport(false).toString(), new Object[0]);
        }
    }

    @VisibleForTesting
    String generateSequenceName(int groupId) {
        String tuningConfig;
        String dataSchema;
        StringBuilder sb = new StringBuilder();
        ImmutableMap<Integer, Long> startPartitions = this.taskGroups.get((Object)Integer.valueOf((int)groupId)).partitionOffsets;
        for (Map.Entry entry : startPartitions.entrySet()) {
            sb.append(String.format("+%d(%d)", entry.getKey(), entry.getValue()));
        }
        String partitionOffsetStr = sb.toString().substring(1);
        Optional<DateTime> minimumMessageTime = this.taskGroups.get((Object)Integer.valueOf((int)groupId)).minimumMessageTime;
        String minMsgTimeStr = minimumMessageTime.isPresent() ? String.valueOf(((DateTime)minimumMessageTime.get()).getMillis()) : "";
        try {
            dataSchema = this.sortingMapper.writeValueAsString((Object)this.spec.getDataSchema());
            tuningConfig = this.sortingMapper.writeValueAsString((Object)this.taskTuningConfig);
        }
        catch (JsonProcessingException e) {
            throw Throwables.propagate((Throwable)e);
        }
        String hashCode = DigestUtils.sha1Hex((String)(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr)).substring(0, 15);
        return Joiner.on((String)"_").join((Object)"index_kafka", (Object)this.dataSource, new Object[]{hashCode});
    }

    private static String getRandomId() {
        StringBuilder suffix = new StringBuilder(8);
        for (int i = 0; i < 8; ++i) {
            suffix.append((char)(97 + (RANDOM.nextInt() >>> i * 4 & 0xF)));
        }
        return suffix.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties props = new Properties();
        props.putAll(this.ioConfig.getConsumerProperties());
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("metadata.max.age.ms", "10000");
        props.setProperty("group.id", String.format("kafka-supervisor-%s", KafkaSupervisor.getRandomId()));
        ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            return kafkaConsumer;
        }
        finally {
            Thread.currentThread().setContextClassLoader(currCtxCl);
        }
    }

    private void updatePartitionDataFromKafka() {
        Map topics;
        try {
            topics = this.consumer.listTopics();
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", new Object[]{this.ioConfig.getConsumerProperties().get("bootstrap.servers")});
            return;
        }
        List partitions = (List)topics.get(this.ioConfig.getTopic());
        int numPartitions = partitions != null ? partitions.size() : 0;
        log.debug("Found [%d] Kafka partitions for topic [%s]", new Object[]{numPartitions, this.ioConfig.getTopic()});
        for (int partition = 0; partition < numPartitions; ++partition) {
            int taskGroupId = this.getTaskGroupIdForPartition(partition);
            this.partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap());
            ConcurrentHashMap<Integer, Long> partitionMap = this.partitionGroups.get(taskGroupId);
            if (partitionMap.putIfAbsent(partition, -1L) != null) continue;
            log.info("New partition [%d] discovered for topic [%s], added to task group [%d]", new Object[]{partition, this.ioConfig.getTopic(), taskGroupId});
        }
    }

    private void discoverTasks() throws ExecutionException, InterruptedException {
        int taskCount = 0;
        ArrayList futureTaskIds = Lists.newArrayList();
        ArrayList futures = Lists.newArrayList();
        List tasks = this.taskStorage.getActiveTasks();
        for (Task task : tasks) {
            if (!(task instanceof KafkaIndexTask) || !this.dataSource.equals(task.getDataSource())) continue;
            ++taskCount;
            final KafkaIndexTask kafkaTask = (KafkaIndexTask)task;
            final String taskId = task.getId();
            Iterator<Integer> it = kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().iterator();
            final Integer taskGroupId = it.hasNext() ? Integer.valueOf(this.getTaskGroupIdForPartition(it.next())) : null;
            if (taskGroupId == null) continue;
            TaskGroup taskGroup = this.taskGroups.get(taskGroupId);
            if (this.isTaskInPendingCompletionGroups(taskId) || taskGroup != null && taskGroup.tasks.containsKey(taskId)) continue;
            futureTaskIds.add(taskId);
            futures.add(Futures.transform(this.taskClient.getStatusAsync(taskId), (Function)new Function<KafkaIndexTask.Status, Boolean>(){

                public Boolean apply(KafkaIndexTask.Status status) {
                    if (status == KafkaIndexTask.Status.PUBLISHING) {
                        KafkaSupervisor.this.addDiscoveredTaskToPendingCompletionTaskGroups(taskGroupId, taskId, kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap());
                        Map<Integer, Long> publishingTaskCurrentOffsets = KafkaSupervisor.this.taskClient.getCurrentOffsets(taskId, true);
                        for (Map.Entry<Integer, Long> entry : publishingTaskCurrentOffsets.entrySet()) {
                            boolean succeeded;
                            Integer partition = entry.getKey();
                            Long offset = entry.getValue();
                            ConcurrentHashMap partitionOffsets = (ConcurrentHashMap)KafkaSupervisor.this.partitionGroups.get(KafkaSupervisor.this.getTaskGroupIdForPartition(partition));
                            do {
                                succeeded = true;
                                Long previousOffset = partitionOffsets.putIfAbsent(partition, offset);
                                if (previousOffset == null || previousOffset >= offset) continue;
                                succeeded = partitionOffsets.replace(partition, previousOffset, offset);
                            } while (!succeeded);
                        }
                    } else {
                        for (Integer partition : kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet()) {
                            if (taskGroupId.equals(KafkaSupervisor.this.getTaskGroupIdForPartition(partition))) continue;
                            log.warn("Stopping task [%s] which does not match the expected partition allocation", new Object[]{taskId});
                            try {
                                KafkaSupervisor.this.stopTask(taskId, false).get();
                            }
                            catch (InterruptedException | ExecutionException e) {
                                log.warn((Throwable)e, "Exception while stopping task", new Object[0]);
                            }
                            return false;
                        }
                        if (KafkaSupervisor.this.taskGroups.putIfAbsent(taskGroupId, new TaskGroup((ImmutableMap<Integer, Long>)ImmutableMap.copyOf(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()), kafkaTask.getIOConfig().getMinimumMessageTime())) == null) {
                            log.debug("Created new task group [%d]", new Object[]{taskGroupId});
                        }
                        if (!KafkaSupervisor.this.isTaskCurrent(taskGroupId, taskId)) {
                            log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", new Object[]{taskId});
                            try {
                                KafkaSupervisor.this.stopTask(taskId, false).get();
                            }
                            catch (InterruptedException | ExecutionException e) {
                                log.warn((Throwable)e, "Exception while stopping task", new Object[0]);
                            }
                            return false;
                        }
                        ((TaskGroup)((KafkaSupervisor)KafkaSupervisor.this).taskGroups.get((Object)taskGroupId)).tasks.putIfAbsent(taskId, new TaskData());
                    }
                    return true;
                }
            }, (Executor)this.workerExec));
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get();
        for (int i = 0; i < results.size(); ++i) {
            if (results.get(i) != null) continue;
            String taskId = (String)futureTaskIds.get(i);
            log.warn("Task [%s] failed to return status, killing task", new Object[]{taskId});
            this.killTask(taskId);
        }
        log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", new Object[]{taskCount, this.dataSource});
    }

    private void addDiscoveredTaskToPendingCompletionTaskGroups(int groupId, String taskId, Map<Integer, Long> startingPartitions) {
        this.pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList());
        CopyOnWriteArrayList<TaskGroup> taskGroupList = this.pendingCompletionTaskGroups.get(groupId);
        for (TaskGroup taskGroup : taskGroupList) {
            if (!taskGroup.partitionOffsets.equals(startingPartitions)) continue;
            if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
                log.info("Added discovered task [%s] to existing pending task group", new Object[]{taskId});
            }
            return;
        }
        log.info("Creating new pending completion task group for discovered task [%s]", new Object[]{taskId});
        TaskGroup newTaskGroup = new TaskGroup((ImmutableMap<Integer, Long>)ImmutableMap.copyOf(startingPartitions), (Optional<DateTime>)Optional.absent());
        newTaskGroup.tasks.put(taskId, new TaskData());
        newTaskGroup.completionTimeout = DateTime.now().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
        taskGroupList.add(newTaskGroup);
    }

    /*
     * WARNING - void declaration
     */
    private void updateTaskStatus() throws ExecutionException, InterruptedException {
        void var4_8;
        ArrayList futures = Lists.newArrayList();
        ArrayList futureTaskIds = Lists.newArrayList();
        for (TaskGroup taskGroup : this.taskGroups.values()) {
            for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                String taskId = entry.getKey();
                final TaskData taskData = entry.getValue();
                if (taskData.startTime == null) {
                    futureTaskIds.add(taskId);
                    futures.add(Futures.transform(this.taskClient.getStartTimeAsync(taskId), (Function)new Function<DateTime, Boolean>(){

                        @Nullable
                        public Boolean apply(@Nullable DateTime startTime) {
                            if (startTime == null) {
                                return false;
                            }
                            taskData.startTime = startTime;
                            long millisRemaining = KafkaSupervisor.this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - taskData.startTime.getMillis());
                            if (millisRemaining > 0L) {
                                KafkaSupervisor.this.scheduledExec.schedule(KafkaSupervisor.this.buildRunTask(), millisRemaining + 1000L, TimeUnit.MILLISECONDS);
                            }
                            return true;
                        }
                    }, (Executor)this.workerExec));
                }
                taskData.status = (TaskStatus)this.taskStorage.getStatus(taskId).get();
            }
        }
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup group : list) {
                for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
                    entry.getValue().status = (TaskStatus)this.taskStorage.getStatus(entry.getKey()).get();
                }
            }
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get();
        boolean bl = false;
        while (var4_8 < results.size()) {
            if (results.get((int)var4_8) == null) {
                String taskId = (String)futureTaskIds.get((int)var4_8);
                log.warn("Task [%s] failed to return start time, killing task", new Object[]{taskId});
                this.killTask(taskId);
            }
            ++var4_8;
        }
    }

    private void checkTaskDuration() throws InterruptedException, ExecutionException {
        TaskGroup group;
        Integer groupId;
        ArrayList futures = Lists.newArrayList();
        ArrayList futureGroupIds = Lists.newArrayList();
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            groupId = entry.getKey();
            group = entry.getValue();
            DateTime earliestTaskStart = DateTime.now();
            for (TaskData taskData : group.tasks.values()) {
                if (!earliestTaskStart.isAfter((ReadableInstant)taskData.startTime)) continue;
                earliestTaskStart = taskData.startTime;
            }
            if (!earliestTaskStart.plus((ReadableDuration)this.ioConfig.getTaskDuration()).isBeforeNow()) continue;
            log.info("Task group [%d] has run for [%s]", new Object[]{groupId, this.ioConfig.getTaskDuration()});
            futureGroupIds.add(groupId);
            futures.add(this.signalTasksToFinish(groupId));
        }
        List results = (List)Futures.successfulAsList((Iterable)futures).get();
        for (int j = 0; j < results.size(); ++j) {
            groupId = (Integer)futureGroupIds.get(j);
            group = this.taskGroups.get(groupId);
            Map endOffsets = (Map)results.get(j);
            if (endOffsets != null) {
                group.completionTimeout = DateTime.now().plus((ReadableDuration)this.ioConfig.getCompletionTimeout());
                this.pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList());
                this.pendingCompletionTaskGroups.get(groupId).add(group);
                for (Map.Entry entry : endOffsets.entrySet()) {
                    this.partitionGroups.get(groupId).put((Integer)entry.getKey(), (Long)entry.getValue());
                }
            } else {
                log.warn("All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", new Object[]{groupId, group.tasks.keySet()});
                for (String string : group.tasks.keySet()) {
                    this.killTask(string);
                }
            }
            this.taskGroups.remove(groupId);
        }
    }

    private ListenableFuture<Map<Integer, Long>> signalTasksToFinish(int groupId) {
        TaskGroup taskGroup = this.taskGroups.get(groupId);
        Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry<String, TaskData> taskEntry = i.next();
            String taskId = taskEntry.getKey();
            TaskData task = taskEntry.getValue();
            if (task.status.isSuccess()) {
                return Futures.transform(this.stopTasksInGroup(taskGroup), (Function)new Function<Void, Map<Integer, Long>>(){

                    @Nullable
                    public Map<Integer, Long> apply(@Nullable Void input) {
                        return null;
                    }
                }, (Executor)this.workerExec);
            }
            if (!task.status.isRunnable() || !this.taskInfoProvider.getTaskLocation(taskId).equals((Object)TaskLocation.unknown())) continue;
            log.info("Killing task [%s] which hasn't been assigned to a worker", new Object[]{taskId});
            this.killTask(taskId);
            i.remove();
        }
        ArrayList pauseFutures = Lists.newArrayList();
        ImmutableList pauseTaskIds = ImmutableList.copyOf((Collection)taskGroup.tasks.keySet());
        for (String taskId : pauseTaskIds) {
            pauseFutures.add(this.taskClient.pauseAsync(taskId));
        }
        return Futures.transform((ListenableFuture)Futures.successfulAsList((Iterable)pauseFutures), (Function)new Function<List<Map<Integer, Long>>, Map<Integer, Long>>((List)pauseTaskIds, taskGroup, groupId){
            final /* synthetic */ List val$pauseTaskIds;
            final /* synthetic */ TaskGroup val$taskGroup;
            final /* synthetic */ int val$groupId;
            {
                this.val$pauseTaskIds = list;
                this.val$taskGroup = taskGroup;
                this.val$groupId = n;
            }

            /*
             * WARNING - void declaration
             */
            @Nullable
            public Map<Integer, Long> apply(List<Map<Integer, Long>> input) {
                HashMap<Integer, Long> endOffsets = new HashMap<Integer, Long>();
                for (int i = 0; i < input.size(); ++i) {
                    Map<Integer, Long> result = input.get(i);
                    if (result == null || result.isEmpty()) {
                        Iterator taskId = (String)this.val$pauseTaskIds.get(i);
                        log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", new Object[]{taskId});
                        KafkaSupervisor.this.killTask((String)((Object)taskId));
                        this.val$taskGroup.tasks.remove(taskId);
                        continue;
                    }
                    for (Map.Entry entry : result.entrySet()) {
                        if (endOffsets.containsKey(entry.getKey()) && ((Long)endOffsets.get(entry.getKey())).compareTo((Long)entry.getValue()) >= 0) continue;
                        endOffsets.put((Integer)entry.getKey(), (Long)entry.getValue());
                    }
                }
                ArrayList setEndOffsetFutures = Lists.newArrayList();
                ImmutableList setEndOffsetTaskIds = ImmutableList.copyOf((Collection)this.val$taskGroup.tasks.keySet());
                if (setEndOffsetTaskIds.isEmpty()) {
                    log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{this.val$groupId});
                    return null;
                }
                log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", new Object[]{this.val$groupId, endOffsets});
                for (String string : setEndOffsetTaskIds) {
                    setEndOffsetFutures.add(KafkaSupervisor.this.taskClient.setEndOffsetsAsync(string, endOffsets, true));
                }
                try {
                    void var6_12;
                    List results = (List)Futures.successfulAsList((Iterable)setEndOffsetFutures).get();
                    boolean bl = false;
                    while (var6_12 < results.size()) {
                        if (results.get((int)var6_12) == null || !((Boolean)results.get((int)var6_12)).booleanValue()) {
                            String taskId = (String)setEndOffsetTaskIds.get((int)var6_12);
                            log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", new Object[]{taskId});
                            KafkaSupervisor.this.killTask(taskId);
                            this.val$taskGroup.tasks.remove(taskId);
                        }
                        ++var6_12;
                    }
                }
                catch (Exception e) {
                    Throwables.propagate((Throwable)e);
                }
                if (this.val$taskGroup.tasks.isEmpty()) {
                    log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{this.val$groupId});
                    return null;
                }
                return endOffsets;
            }
        }, (Executor)this.workerExec);
    }

    private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException {
        ArrayList futures = Lists.newArrayList();
        for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : this.pendingCompletionTaskGroups.entrySet()) {
            boolean stopTasksInTaskGroup = false;
            Integer groupId = pendingGroupList.getKey();
            CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingGroupList.getValue();
            ArrayList toRemove = Lists.newArrayList();
            for (TaskGroup group : taskGroupList) {
                boolean foundSuccess = false;
                boolean entireTaskGroupFailed = false;
                if (stopTasksInTaskGroup) {
                    futures.add(this.stopTasksInGroup(group));
                    toRemove.add(group);
                    continue;
                }
                Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
                while (iTask.hasNext()) {
                    Map.Entry<String, TaskData> task = iTask.next();
                    if (task.getValue().status.isFailure()) {
                        iTask.remove();
                        if (group.tasks.isEmpty()) {
                            entireTaskGroupFailed = true;
                            break;
                        }
                    }
                    if (!task.getValue().status.isSuccess()) continue;
                    log.info("Task [%s] completed successfully, stopping tasks %s", new Object[]{task.getKey(), group.tasks.keySet()});
                    futures.add(this.stopTasksInGroup(group));
                    foundSuccess = true;
                    toRemove.add(group);
                    break;
                }
                if ((foundSuccess || !group.completionTimeout.isBeforeNow()) && !entireTaskGroupFailed) continue;
                if (entireTaskGroupFailed) {
                    log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", new Object[]{groupId});
                } else {
                    log.makeAlert("No task in [%s] succeeded before the completion timeout elapsed [%s]!", new Object[]{group.tasks.keySet(), this.ioConfig.getCompletionTimeout()}).emit();
                }
                this.partitionGroups.remove(groupId);
                futures.add(this.stopTasksInGroup(group));
                stopTasksInTaskGroup = true;
                futures.add(this.stopTasksInGroup(this.taskGroups.remove(groupId)));
                toRemove.add(group);
            }
            taskGroupList.removeAll(toRemove);
        }
        Futures.successfulAsList((Iterable)futures).get();
    }

    private void checkCurrentTaskState() throws ExecutionException, InterruptedException {
        ArrayList futures = Lists.newArrayList();
        Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = this.taskGroups.entrySet().iterator();
        while (iTaskGroups.hasNext()) {
            Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
            Integer groupId = taskGroupEntry.getKey();
            TaskGroup taskGroup = taskGroupEntry.getValue();
            log.debug("Task group [%d] pre-pruning: %s", new Object[]{groupId, taskGroup.tasks.keySet()});
            Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
            while (iTasks.hasNext()) {
                Map.Entry<String, TaskData> task = iTasks.next();
                String taskId = task.getKey();
                TaskData taskData = task.getValue();
                if (!this.isTaskCurrent(groupId, taskId)) {
                    log.info("Stopping task [%s] which does not match the expected offset range and ingestion spec", new Object[]{taskId});
                    futures.add(this.stopTask(taskId, false));
                    iTasks.remove();
                    continue;
                }
                if (taskData.status.isFailure()) {
                    iTasks.remove();
                    continue;
                }
                if (!taskData.status.isSuccess()) continue;
                futures.add(this.stopTasksInGroup(taskGroup));
                iTaskGroups.remove();
                break;
            }
            log.debug("Task group [%d] post-pruning: %s", new Object[]{groupId, taskGroup.tasks.keySet()});
        }
        Futures.successfulAsList((Iterable)futures).get();
    }

    void createNewTasks() {
        for (Integer groupId : this.partitionGroups.keySet()) {
            if (this.taskGroups.containsKey(groupId)) continue;
            log.info("Creating new task group [%d] for partitions %s", new Object[]{groupId, this.partitionGroups.get(groupId).keySet()});
            Optional minimumMessageTime = this.ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of((Object)DateTime.now().minus((ReadableDuration)this.ioConfig.getLateMessageRejectionPeriod().get())) : Optional.absent();
            this.taskGroups.put(groupId, new TaskGroup(this.generateStartingOffsetsForPartitionGroup(groupId), (Optional<DateTime>)minimumMessageTime));
        }
        boolean createdTask = false;
        for (Map.Entry<Integer, TaskGroup> entry : this.taskGroups.entrySet()) {
            TaskGroup taskGroup = entry.getValue();
            Integer groupId = entry.getKey();
            if (this.ioConfig.getReplicas() <= taskGroup.tasks.size()) continue;
            log.info("Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", new Object[]{taskGroup.tasks.size(), this.ioConfig.getReplicas(), groupId});
            this.createKafkaTasksForGroup(groupId, this.ioConfig.getReplicas() - taskGroup.tasks.size());
            createdTask = true;
        }
        if (createdTask && this.firstRunTime.isBeforeNow()) {
            this.scheduledExec.schedule(this.buildRunTask(), 5000L, TimeUnit.MILLISECONDS);
        }
    }

    private void createKafkaTasksForGroup(int groupId, int replicas) {
        ImmutableMap<Integer, Long> startPartitions = this.taskGroups.get((Object)Integer.valueOf((int)groupId)).partitionOffsets;
        HashMap<Integer, Long> endPartitions = new HashMap<Integer, Long>();
        for (Integer partition : startPartitions.keySet()) {
            endPartitions.put(partition, Long.MAX_VALUE);
        }
        String sequenceName = this.generateSequenceName(groupId);
        HashMap consumerProperties = Maps.newHashMap(this.ioConfig.getConsumerProperties());
        DateTime minimumMessageTime = (DateTime)this.taskGroups.get((Object)Integer.valueOf((int)groupId)).minimumMessageTime.orNull();
        KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(sequenceName, new KafkaPartitions(this.ioConfig.getTopic(), (Map<Integer, Long>)startPartitions), new KafkaPartitions(this.ioConfig.getTopic(), endPartitions), consumerProperties, true, false, minimumMessageTime);
        for (int i = 0; i < replicas; ++i) {
            String taskId = Joiner.on((String)"_").join((Object)sequenceName, (Object)KafkaSupervisor.getRandomId(), new Object[0]);
            KafkaIndexTask indexTask = new KafkaIndexTask(taskId, new TaskResource(sequenceName, 1), this.spec.getDataSchema(), this.taskTuningConfig, kafkaIOConfig, (Map<String, Object>)ImmutableMap.of(), null);
            Optional taskQueue = this.taskMaster.getTaskQueue();
            if (taskQueue.isPresent()) {
                try {
                    ((TaskQueue)taskQueue.get()).add((Task)indexTask);
                }
                catch (EntryExistsException e) {
                    log.error("Tried to add task [%s] but it already exists", new Object[]{indexTask.getId()});
                }
                continue;
            }
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<Integer, Long> entry : this.partitionGroups.get(groupId).entrySet()) {
            Integer partition = entry.getKey();
            Long offset = entry.getValue();
            if (offset != null && offset != -1L) {
                builder.put((Object)partition, (Object)offset);
                continue;
            }
            builder.put((Object)partition, (Object)this.getOffsetFromStorageForPartition(partition));
        }
        return builder.build();
    }

    private long getOffsetFromStorageForPartition(int partition) {
        long offset;
        Map<Integer, Long> metadataOffsets = this.getOffsetsFromMetadataStorage();
        if (metadataOffsets.get(partition) != null) {
            offset = metadataOffsets.get(partition);
            log.debug("Getting offset [%,d] from metadata storage for partition [%d]", new Object[]{offset, partition});
            long latestKafkaOffset = this.getOffsetFromKafkaForPartition(partition, false);
            if (offset > latestKafkaOffset) {
                throw new ISE("Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition[%d] dataSource[%s]. If these messages are no longer available (perhaps you deleted and re-created your Kafka topic) you can use the supervisor reset API to restart ingestion.", new Object[]{offset, latestKafkaOffset, partition, this.dataSource});
            }
        } else {
            offset = this.getOffsetFromKafkaForPartition(partition, this.ioConfig.isUseEarliestOffset());
            log.debug("Getting offset [%,d] from Kafka for partition [%d]", new Object[]{offset, partition});
        }
        return offset;
    }

    private Map<Integer, Long> getOffsetsFromMetadataStorage() {
        KafkaPartitions partitions;
        DataSourceMetadata dataSourceMetadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata && (partitions = ((KafkaDataSourceMetadata)dataSourceMetadata).getKafkaPartitions()) != null) {
            if (!this.ioConfig.getTopic().equals(partitions.getTopic())) {
                log.warn("Topic in metadata storage [%s] doesn't match spec topic [%s], ignoring stored offsets", new Object[]{partitions.getTopic(), this.ioConfig.getTopic()});
                return ImmutableMap.of();
            }
            if (partitions.getPartitionOffsetMap() != null) {
                return partitions.getPartitionOffsetMap();
            }
        }
        return ImmutableMap.of();
    }

    private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset) {
        TopicPartition topicPartition = new TopicPartition(this.ioConfig.getTopic(), partition);
        if (!this.consumer.assignment().contains(topicPartition)) {
            this.consumer.assign((List)Lists.newArrayList((Object[])new TopicPartition[]{topicPartition}));
        }
        if (useEarliestOffset) {
            this.consumer.seekToBeginning(new TopicPartition[]{topicPartition});
        } else {
            this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
        }
        return this.consumer.position(topicPartition);
    }

    private boolean isTaskCurrent(int taskGroupId, String taskId) {
        Optional taskOptional = this.taskStorage.getTask(taskId);
        if (!taskOptional.isPresent() || !(taskOptional.get() instanceof KafkaIndexTask)) {
            return false;
        }
        String taskSequenceName = ((KafkaIndexTask)((Object)taskOptional.get())).getIOConfig().getBaseSequenceName();
        return this.generateSequenceName(taskGroupId).equals(taskSequenceName);
    }

    private ListenableFuture<Void> stopTasksInGroup(TaskGroup taskGroup) {
        if (taskGroup == null) {
            return Futures.immediateFuture(null);
        }
        ArrayList futures = Lists.newArrayList();
        for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
            if (entry.getValue().status.isComplete()) continue;
            futures.add(this.stopTask(entry.getKey(), false));
        }
        return Futures.transform((ListenableFuture)Futures.successfulAsList((Iterable)futures), (Function)new Function<List<Void>, Void>(){

            @Nullable
            public Void apply(@Nullable List<Void> input) {
                return null;
            }
        }, (Executor)this.workerExec);
    }

    private ListenableFuture<Void> stopTask(final String id, boolean publish) {
        return Futures.transform(this.taskClient.stopAsync(id, publish), (Function)new Function<Boolean, Void>(){

            @Nullable
            public Void apply(@Nullable Boolean result) {
                if (result == null || !result.booleanValue()) {
                    log.info("Task [%s] failed to stop in a timely manner, killing task", new Object[]{id});
                    KafkaSupervisor.this.killTask(id);
                }
                return null;
            }
        }, (Executor)this.workerExec);
    }

    private void killTask(String id) {
        Optional taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue)taskQueue.get()).shutdown(id);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private int getTaskGroupIdForPartition(int partition) {
        return partition % this.ioConfig.getTaskCount();
    }

    private boolean isTaskInPendingCompletionGroups(String taskId) {
        for (List list : this.pendingCompletionTaskGroups.values()) {
            for (TaskGroup taskGroup : list) {
                if (!taskGroup.tasks.containsKey(taskId)) continue;
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - void declaration
     */
    private KafkaSupervisorReport generateReport(boolean includeOffsets) {
        int numPartitions = 0;
        for (Map map : this.partitionGroups.values()) {
            numPartitions += map.size();
        }
        KafkaSupervisorReport report = new KafkaSupervisorReport(this.dataSource, DateTime.now(), this.ioConfig.getTopic(), numPartitions, this.ioConfig.getReplicas(), this.ioConfig.getTaskDuration().getMillis() / 1000L);
        ArrayList arrayList = Lists.newArrayList();
        ArrayList futures = Lists.newArrayList();
        try {
            void var7_13;
            for (TaskGroup taskGroup : this.taskGroups.values()) {
                for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                    String taskId = entry.getKey();
                    DateTime startTime = entry.getValue().startTime;
                    Long remainingSeconds = null;
                    if (startTime != null) {
                        remainingSeconds = Math.max(0L, this.ioConfig.getTaskDuration().getMillis() - (DateTime.now().getMillis() - startTime.getMillis())) / 1000L;
                    }
                    arrayList.add(new TaskReportData(taskId, (Map<Integer, Long>)(includeOffsets ? taskGroup.partitionOffsets : null), null, startTime, remainingSeconds, TaskReportData.TaskType.ACTIVE));
                    if (!includeOffsets) continue;
                    futures.add(this.taskClient.getCurrentOffsetsAsync(taskId, false));
                }
            }
            for (List list : this.pendingCompletionTaskGroups.values()) {
                for (TaskGroup taskGroup : list) {
                    for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
                        String taskId = entry.getKey();
                        DateTime startTime = entry.getValue().startTime;
                        Long remainingSeconds = null;
                        if (taskGroup.completionTimeout != null) {
                            remainingSeconds = Math.max(0L, taskGroup.completionTimeout.getMillis() - DateTime.now().getMillis()) / 1000L;
                        }
                        arrayList.add(new TaskReportData(taskId, (Map<Integer, Long>)(includeOffsets ? taskGroup.partitionOffsets : null), null, startTime, remainingSeconds, TaskReportData.TaskType.PUBLISHING));
                        if (!includeOffsets) continue;
                        futures.add(this.taskClient.getCurrentOffsetsAsync(taskId, false));
                    }
                }
            }
            List results = (List)Futures.successfulAsList((Iterable)futures).get();
            boolean bl = false;
            while (var7_13 < arrayList.size()) {
                TaskReportData reportData = (TaskReportData)arrayList.get((int)var7_13);
                if (includeOffsets) {
                    reportData.setCurrentOffsets((Map)results.get((int)var7_13));
                }
                report.addTask(reportData);
                ++var7_13;
            }
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Failed to generate status report", new Object[0]);
        }
        return report;
    }

    private Runnable buildRunTask() {
        return new Runnable(){

            @Override
            public void run() {
                KafkaSupervisor.this.notices.add(new RunNotice());
            }
        };
    }

    private class ResetNotice
    implements Notice {
        private ResetNotice() {
        }

        @Override
        public void handle() {
            KafkaSupervisor.this.resetInternal();
        }
    }

    private class ShutdownNotice
    implements Notice {
        private ShutdownNotice() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle() throws InterruptedException, ExecutionException {
            KafkaSupervisor.this.consumer.close();
            Object object = KafkaSupervisor.this.stopLock;
            synchronized (object) {
                KafkaSupervisor.this.stopped = true;
                KafkaSupervisor.this.stopLock.notifyAll();
            }
        }
    }

    private class GracefulShutdownNotice
    extends ShutdownNotice {
        private GracefulShutdownNotice() {
        }

        @Override
        public void handle() throws InterruptedException, ExecutionException {
            KafkaSupervisor.this.gracefulShutdownInternal();
            super.handle();
        }
    }

    private class RunNotice
    implements Notice {
        private RunNotice() {
        }

        @Override
        public void handle() throws ExecutionException, InterruptedException {
            long nowTime = System.currentTimeMillis();
            if (nowTime - KafkaSupervisor.this.lastRunTime < 1000L) {
                return;
            }
            KafkaSupervisor.this.lastRunTime = nowTime;
            KafkaSupervisor.this.runInternal();
        }
    }

    private static interface Notice {
        public void handle() throws ExecutionException, InterruptedException;
    }

    private class TaskData {
        TaskStatus status;
        DateTime startTime;

        private TaskData() {
        }
    }

    private class TaskGroup {
        final ImmutableMap<Integer, Long> partitionOffsets;
        final ConcurrentHashMap<String, TaskData> tasks = new ConcurrentHashMap();
        final Optional<DateTime> minimumMessageTime;
        DateTime completionTimeout;

        public TaskGroup(ImmutableMap<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime) {
            this.partitionOffsets = partitionOffsets;
            this.minimumMessageTime = minimumMessageTime;
        }
    }
}

