/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.Tasks;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

public class TaskManager {
    private final Logger log;
    private final Time time;
    private final ChangelogReader changelogReader;
    private final UUID processId;
    private final String logPrefix;
    private final InternalTopologyBuilder builder;
    private final Admin adminClient;
    private final StateDirectory stateDirectory;
    private final StreamThread.ProcessingMode processingMode;
    private final Tasks tasks;
    private Consumer<byte[], byte[]> mainConsumer;
    private DeleteRecordsResult deleteRecordsResult;
    private boolean rebalanceInProgress = false;
    private final Set<TaskId> lockedTaskDirectories = new HashSet<TaskId>();

    TaskManager(Time time, ChangelogReader changelogReader, UUID processId, String logPrefix, StreamsMetricsImpl streamsMetrics, ActiveTaskCreator activeTaskCreator, StandbyTaskCreator standbyTaskCreator, InternalTopologyBuilder builder, Admin adminClient, StateDirectory stateDirectory, StreamThread.ProcessingMode processingMode) {
        this.time = time;
        this.changelogReader = changelogReader;
        this.processId = processId;
        this.logPrefix = logPrefix;
        this.builder = builder;
        this.adminClient = adminClient;
        this.stateDirectory = stateDirectory;
        this.processingMode = processingMode;
        this.tasks = new Tasks(logPrefix, builder, streamsMetrics, activeTaskCreator, standbyTaskCreator);
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
    }

    void setMainConsumer(Consumer<byte[], byte[]> mainConsumer) {
        this.mainConsumer = mainConsumer;
        this.tasks.setMainConsumer(mainConsumer);
    }

    public UUID processId() {
        return this.processId;
    }

    InternalTopologyBuilder builder() {
        return this.builder;
    }

    boolean isRebalanceInProgress() {
        return this.rebalanceInProgress;
    }

    void handleRebalanceStart(Set<String> subscribedTopics) {
        this.builder.addSubscribedTopicsFromMetadata(subscribedTopics, this.logPrefix);
        this.tryToLockAllNonEmptyTaskDirectories();
        this.rebalanceInProgress = true;
    }

    void handleRebalanceComplete() {
        this.mainConsumer.pause(this.mainConsumer.assignment());
        this.releaseLockedUnassignedTaskDirectories();
        this.rebalanceInProgress = false;
    }

    void handleCorruption(Set<TaskId> corruptedTasks) {
        HashSet<Task> corruptedActiveTasks = new HashSet<Task>();
        HashSet<Task> corruptedStandbyTasks = new HashSet<Task>();
        for (TaskId taskId : corruptedTasks) {
            Task task = this.tasks.task(taskId);
            if (task.isActive()) {
                corruptedActiveTasks.add(task);
                continue;
            }
            corruptedStandbyTasks.add(task);
        }
        this.closeDirtyAndRevive(corruptedStandbyTasks, true);
        try {
            this.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(this.tasks().values().stream().filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING).filter(t -> !corruptedTasks.contains(t.id())).collect(Collectors.toSet()), new HashMap<Task, Map<TopicPartition, OffsetAndMetadata>>());
        }
        catch (TaskCorruptedException e) {
            this.log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the tasks to clean and revive: {}", (Object)e.corruptedTasks());
            corruptedActiveTasks.addAll(this.tasks.tasks(e.corruptedTasks()));
        }
        catch (TimeoutException e) {
            this.log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
            HashSet<Task> uncorruptedTasks = new HashSet<Task>(this.tasks.activeTasks());
            uncorruptedTasks.removeAll(corruptedActiveTasks);
            this.closeDirtyAndRevive(uncorruptedTasks, false);
        }
        this.closeDirtyAndRevive(corruptedActiveTasks, true);
    }

    private void closeDirtyAndRevive(Collection<Task> taskWithChangelogs, boolean markAsCorrupted) {
        for (Task task : taskWithChangelogs) {
            Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();
            if (markAsCorrupted) {
                task.markChangelogAsCorrupted(corruptedPartitions);
            }
            try {
                task.prepareCommit();
            }
            catch (RuntimeException swallow) {
                this.log.error("Error flushing cache for corrupted task {} ", (Object)task.id(), (Object)swallow);
            }
            try {
                task.suspend();
                if (markAsCorrupted) {
                    task.postCommit(true);
                }
            }
            catch (RuntimeException swallow) {
                this.log.error("Error suspending corrupted task {} ", (Object)task.id(), (Object)swallow);
            }
            task.closeDirty();
            if (task.isActive()) {
                Set<TopicPartition> currentAssignment = this.mainConsumer.assignment();
                Set<TopicPartition> taskInputPartitions = task.inputPartitions();
                Set<TopicPartition> assignedToPauseAndReset = Utils.intersection(HashSet::new, currentAssignment, taskInputPartitions);
                if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
                    this.log.warn("Expected the current consumer assignment {} to contain the input partitions {}. Will proceed to recover.", (Object)currentAssignment, (Object)taskInputPartitions);
                }
                task.addPartitionsForOffsetReset(assignedToPauseAndReset);
            }
            task.revive();
        }
    }

    public void handleAssignment(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
        this.log.info("Handle new assignment with:\n\tNew active tasks: {}\n\tNew standby tasks: {}\n\tExisting active tasks: {}\n\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), this.activeTaskIds(), this.standbyTaskIds());
        this.builder.addSubscribedTopicsFromAssignment(activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), this.logPrefix);
        LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<TaskId, RuntimeException>();
        HashMap<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<TaskId, Set<TopicPartition>>(activeTasks);
        HashMap<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<TaskId, Set<TopicPartition>>(standbyTasks);
        Comparator<Task> byId = Comparator.comparing(Task::id);
        TreeSet<Task> tasksToRecycle = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseClean = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseDirty = new TreeSet<Task>(byId);
        for (Task task : this.tasks.allTasks()) {
            if (activeTasks.containsKey(task.id()) && task.isActive()) {
                this.tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                activeTasksToCreate.remove(task.id());
                continue;
            }
            if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                this.tasks.updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                standbyTasksToCreate.remove(task.id());
                continue;
            }
            if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
                tasksToRecycle.add(task);
                continue;
            }
            tasksToCloseClean.add(task);
        }
        this.handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, taskCloseExceptions);
        if (!taskCloseExceptions.isEmpty()) {
            this.log.error("Hit exceptions while closing / recycling tasks: {}", (Object)taskCloseExceptions);
            for (Map.Entry entry : taskCloseExceptions.entrySet()) {
                if (entry.getValue() instanceof TaskMigratedException) continue;
                if (entry.getValue() instanceof KafkaException) {
                    throw (RuntimeException)entry.getValue();
                }
                throw new RuntimeException("Unexpected failure to close " + taskCloseExceptions.size() + " task(s) [" + taskCloseExceptions.keySet() + "]. First unexpected exception (for task " + entry.getKey() + ") follows.", (Throwable)entry.getValue());
            }
            Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
            throw first.getValue();
        }
        this.tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
    }

    private void handleCloseAndRecycle(Set<Task> tasksToRecycle, Set<Task> tasksToCloseClean, Set<Task> tasksToCloseDirty, Map<TaskId, Set<TopicPartition>> activeTasksToCreate, Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
        String uncleanMessage;
        if (!tasksToCloseDirty.isEmpty()) {
            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
        }
        ArrayList<Task> tasksToCheckpoint = new ArrayList<Task>(tasksToCloseClean);
        tasksToCheckpoint.addAll(tasksToRecycle);
        for (Task task : tasksToCheckpoint) {
            try {
                Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
                if (!offsets.isEmpty()) {
                    this.log.error("Task {} should have been committed when it was suspended, but it reports non-empty offsets {} to commit; this means it failed during last commit and hence should be closed dirty", (Object)task.id(), (Object)offsets);
                    tasksToCloseDirty.add(task);
                    continue;
                }
                if (task.isActive()) continue;
                task.suspend();
                task.postCommit(true);
            }
            catch (RuntimeException e) {
                uncleanMessage = String.format("Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage, e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
                tasksToCloseDirty.add(task);
            }
        }
        tasksToCloseClean.removeAll(tasksToCloseDirty);
        for (Task task : tasksToCloseClean) {
            try {
                this.completeTaskCloseClean(task);
                if (!task.isActive()) continue;
                this.tasks.cleanUpTaskProducerAndRemoveTask(task.id(), taskCloseExceptions);
            }
            catch (RuntimeException e) {
                uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage, e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
                tasksToCloseDirty.add(task);
            }
        }
        tasksToRecycle.removeAll(tasksToCloseDirty);
        for (Task oldTask : tasksToRecycle) {
            try {
                Set<TopicPartition> partitions;
                if (oldTask.isActive()) {
                    partitions = standbyTasksToCreate.remove(oldTask.id());
                    this.tasks.convertActiveToStandby((StreamTask)oldTask, partitions, taskCloseExceptions);
                    continue;
                }
                partitions = activeTasksToCreate.remove(oldTask.id());
                this.tasks.convertStandbyToActive((StandbyTask)oldTask, partitions);
            }
            catch (RuntimeException e) {
                uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id());
                this.log.error(uncleanMessage, e);
                taskCloseExceptions.putIfAbsent(oldTask.id(), e);
                tasksToCloseDirty.add(oldTask);
            }
        }
        for (Task task : tasksToCloseDirty) {
            this.closeTaskDirty(task);
            this.tasks.cleanUpTaskProducerAndRemoveTask(task.id(), taskCloseExceptions);
        }
    }

    boolean tryToCompleteRestoration(long now, java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
        boolean allRunning = true;
        LinkedList<Task> activeTasks = new LinkedList<Task>();
        for (Task task : this.tasks.allTasks()) {
            try {
                task.initializeIfNeeded();
                task.clearTaskTimeout();
            }
            catch (LockException lockException) {
                this.log.debug("Could not initialize task {} since: {}; will retry", (Object)task.id(), (Object)lockException.getMessage());
                allRunning = false;
            }
            catch (TimeoutException timeoutException) {
                task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
                allRunning = false;
            }
            if (!task.isActive()) continue;
            activeTasks.add(task);
        }
        if (allRunning && !activeTasks.isEmpty()) {
            Set<TopicPartition> restored = this.changelogReader.completedChangelogs();
            for (Task task : activeTasks) {
                if (restored.containsAll(task.changelogPartitions())) {
                    try {
                        task.completeRestoration(offsetResetter);
                        task.clearTaskTimeout();
                    }
                    catch (TimeoutException timeoutException) {
                        task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
                        this.log.debug(String.format("Could not complete restoration for %s due to the following exception; will retry", task.id()), timeoutException);
                        allRunning = false;
                    }
                    continue;
                }
                allRunning = false;
            }
        }
        if (allRunning) {
            this.mainConsumer.resume(this.mainConsumer.assignment());
        }
        return allRunning;
    }

    void handleRevocation(Collection<TopicPartition> revokedPartitions) {
        boolean shouldCommitAdditionalTasks;
        HashSet<TopicPartition> remainingRevokedPartitions = new HashSet<TopicPartition>(revokedPartitions);
        HashSet<Task> revokedActiveTasks = new HashSet<Task>();
        HashSet<Task> commitNeededActiveTasks = new HashSet<Task>();
        HashMap<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<Task, Map<TopicPartition, OffsetAndMetadata>>();
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        for (Task task : this.activeTaskIterable()) {
            if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
                revokedActiveTasks.add(task);
                remainingRevokedPartitions.removeAll(task.inputPartitions());
                continue;
            }
            if (!task.commitNeeded()) continue;
            commitNeededActiveTasks.add(task);
        }
        if (!remainingRevokedPartitions.isEmpty()) {
            this.log.warn("The following partitions {} are missing from the task partitions. It could potentially due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.", (Object)remainingRevokedPartitions);
        }
        this.prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
        boolean bl = shouldCommitAdditionalTasks = !consumedOffsetsPerTask.isEmpty();
        if (shouldCommitAdditionalTasks) {
            this.prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
        }
        HashSet<Task> dirtyTasks = new HashSet<Task>();
        try {
            this.commitOffsetsOrTransaction(consumedOffsetsPerTask);
        }
        catch (TaskCorruptedException e) {
            this.log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", (Object)e.corruptedTasks());
            dirtyTasks.addAll(this.tasks.tasks(e.corruptedTasks()));
            this.closeDirtyAndRevive(dirtyTasks, true);
        }
        catch (TimeoutException e) {
            this.log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");
            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
            this.closeDirtyAndRevive(dirtyTasks, false);
        }
        catch (RuntimeException e) {
            this.log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
            firstException.compareAndSet(null, e);
            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
        }
        for (Task task : revokedActiveTasks) {
            if (dirtyTasks.contains(task)) continue;
            try {
                task.postCommit(true);
            }
            catch (RuntimeException e) {
                this.log.error("Exception caught while post-committing task " + task.id(), e);
                firstException.compareAndSet(null, e);
            }
        }
        if (shouldCommitAdditionalTasks) {
            for (Task task : commitNeededActiveTasks) {
                if (dirtyTasks.contains(task)) continue;
                try {
                    task.postCommit(false);
                }
                catch (RuntimeException e) {
                    this.log.error("Exception caught while post-committing task " + task.id(), e);
                    firstException.compareAndSet(null, e);
                }
            }
        }
        for (Task task : revokedActiveTasks) {
            try {
                task.suspend();
            }
            catch (RuntimeException e) {
                this.log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
                firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e));
            }
        }
        if (firstException.get() != null) {
            throw (RuntimeException)firstException.get();
        }
    }

    private void prepareCommitAndAddOffsetsToMap(Set<Task> tasksToPrepare, Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
        for (Task task : tasksToPrepare) {
            Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
            if (committableOffsets.isEmpty()) continue;
            consumedOffsetsPerTask.put(task, committableOffsets);
        }
    }

    void handleLostAll() {
        this.log.debug("Closing lost active tasks as zombies.");
        HashSet<Task> allTask = new HashSet<Task>(this.tasks.allTasks());
        for (Task task : allTask) {
            if (!task.isActive()) continue;
            this.closeTaskDirty(task);
            this.tasks.cleanUpTaskProducerAndRemoveTask(task.id(), new HashMap<TaskId, RuntimeException>());
        }
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_V2) {
            this.tasks.reInitializeThreadProducer();
        }
    }

    public Map<TaskId, Long> getTaskOffsetSums() {
        HashMap<TaskId, Long> taskOffsetSums = new HashMap<TaskId, Long>();
        for (TaskId id : Utils.union(HashSet::new, this.lockedTaskDirectories, this.tasks.tasksPerId().keySet())) {
            Task task;
            Task task2 = task = this.tasks.owned(id) ? this.tasks.task(id) : null;
            if (task != null && task.state() != Task.State.CREATED && task.state() != Task.State.CLOSED) {
                Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                if (changelogOffsets.isEmpty()) {
                    this.log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", (Object)id);
                    continue;
                }
                taskOffsetSums.put(id, this.sumOfChangelogOffsets(id, changelogOffsets));
                continue;
            }
            File checkpointFile = this.stateDirectory.checkpointFileFor(id);
            try {
                if (!checkpointFile.exists()) continue;
                taskOffsetSums.put(id, this.sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
            }
            catch (IOException e) {
                this.log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
            }
        }
        return taskOffsetSums;
    }

    private void tryToLockAllNonEmptyTaskDirectories() {
        this.lockedTaskDirectories.clear();
        for (StateDirectory.TaskDirectory taskDir : this.stateDirectory.listNonEmptyTaskDirectories()) {
            File dir = taskDir.file();
            String namedTopology = taskDir.namedTopology();
            try {
                TaskId id = StateManagerUtil.parseTaskDirectoryName(dir.getName(), namedTopology);
                if (!this.stateDirectory.lock(id)) continue;
                this.lockedTaskDirectories.add(id);
                if (this.tasks.owned(id)) continue;
                this.log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", (Object)id);
            }
            catch (TaskIdFormatException taskIdFormatException) {}
        }
    }

    private void releaseLockedUnassignedTaskDirectories() {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        Iterator<TaskId> taskIdIterator = this.lockedTaskDirectories.iterator();
        while (taskIdIterator.hasNext()) {
            TaskId id = taskIdIterator.next();
            if (this.tasks.owned(id)) continue;
            this.stateDirectory.unlock(id);
            taskIdIterator.remove();
        }
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw fatalException;
        }
    }

    private long sumOfChangelogOffsets(TaskId id, Map<TopicPartition, Long> changelogOffsets) {
        long offsetSum = 0L;
        for (Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
            long offset = changelogEntry.getValue();
            if (offset == -2L) {
                return -2L;
            }
            if (offset == -4L) continue;
            if (offset < 0L) {
                throw new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry);
            }
            if ((offsetSum += offset) >= 0L) continue;
            this.log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", (Object)id);
            return Long.MAX_VALUE;
        }
        return offsetSum;
    }

    private void closeTaskDirty(Task task) {
        try {
            task.prepareCommit();
        }
        catch (RuntimeException swallow) {
            this.log.error("Error flushing caches of dirty task {} ", (Object)task.id(), (Object)swallow);
        }
        try {
            task.suspend();
        }
        catch (RuntimeException swallow) {
            this.log.error("Error suspending dirty task {} ", (Object)task.id(), (Object)swallow);
        }
        this.tasks.removeTaskBeforeClosing(task.id());
        task.closeDirty();
    }

    private void completeTaskCloseClean(Task task) {
        this.tasks.removeTaskBeforeClosing(task.id());
        task.closeClean();
    }

    void shutdown(boolean clean) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        HashSet<Task> tasksToCloseDirty = new HashSet<Task>();
        TreeSet<Task> activeTasks = new TreeSet<Task>(Comparator.comparing(Task::id));
        activeTasks.addAll(this.tasks.activeTasks());
        tasksToCloseDirty.addAll(this.tryCloseCleanAllActiveTasks(clean, firstException));
        tasksToCloseDirty.addAll(this.tryCloseCleanAllStandbyTasks(clean, firstException));
        for (Task task : tasksToCloseDirty) {
            this.closeTaskDirty(task);
        }
        for (Task activeTask : activeTasks) {
            TaskManager.executeAndMaybeSwallow(clean, () -> this.tasks.closeAndRemoveTaskProducerIfNeeded(activeTask), (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while closing task " + activeTask.id() + " producer.", (Throwable)e));
        }
        TaskManager.executeAndMaybeSwallow(clean, this.tasks::closeThreadProducerIfNeeded, (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while closing thread producer.", (Throwable)e));
        this.tasks.clear();
        TaskManager.executeAndMaybeSwallow(clean, this::releaseLockedUnassignedTaskDirectories, (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while unlocking remaining task directories.", (Throwable)e));
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw new RuntimeException("Unexpected exception while closing task", fatalException);
        }
    }

    private Collection<Task> tryCloseCleanAllActiveTasks(boolean clean, AtomicReference<RuntimeException> firstException) {
        if (!clean) {
            return this.activeTaskIterable();
        }
        Comparator<Task> byId = Comparator.comparing(Task::id);
        TreeSet<Task> tasksToCommit = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseDirty = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseClean = new TreeSet<Task>(byId);
        HashMap<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<Task, Map<TopicPartition, OffsetAndMetadata>>();
        for (Task task2 : this.activeTaskIterable()) {
            try {
                Map<TopicPartition, OffsetAndMetadata> committableOffsets = task2.prepareCommit();
                tasksToCommit.add(task2);
                if (!committableOffsets.isEmpty()) {
                    consumedOffsetsAndMetadataPerTask.put(task2, committableOffsets);
                }
                tasksToCloseClean.add(task2);
            }
            catch (TaskMigratedException e) {
                tasksToCloseDirty.add(task2);
            }
            catch (RuntimeException e) {
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task2);
            }
        }
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_V2 && !tasksToCloseDirty.isEmpty()) {
            tasksToCloseClean.removeAll(tasksToCommit);
            tasksToCloseDirty.addAll(tasksToCommit);
        } else {
            try {
                this.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
                for (Task task2 : this.activeTaskIterable()) {
                    try {
                        task2.postCommit(true);
                    }
                    catch (RuntimeException e) {
                        this.log.error("Exception caught while post-committing task " + task2.id(), e);
                        firstException.compareAndSet(null, e);
                        tasksToCloseDirty.add(task2);
                        tasksToCloseClean.remove(task2);
                    }
                }
            }
            catch (TimeoutException timeoutException) {
                firstException.compareAndSet(null, timeoutException);
                tasksToCloseClean.removeAll(tasksToCommit);
                tasksToCloseDirty.addAll(tasksToCommit);
            }
            catch (TaskCorruptedException taskCorruptedException) {
                firstException.compareAndSet(null, taskCorruptedException);
                Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
                Set corruptedTasks = tasksToCommit.stream().filter(task -> corruptedTaskIds.contains(task.id())).collect(Collectors.toSet());
                tasksToCloseClean.removeAll(corruptedTasks);
                tasksToCloseDirty.addAll(corruptedTasks);
            }
            catch (RuntimeException e) {
                this.log.error("Exception caught while committing tasks during shutdown", e);
                firstException.compareAndSet(null, e);
                tasksToCloseClean.removeAll(tasksToCommit);
                tasksToCloseDirty.addAll(tasksToCommit);
            }
        }
        for (Task task2 : tasksToCloseClean) {
            try {
                task2.suspend();
                this.completeTaskCloseClean(task2);
            }
            catch (RuntimeException e) {
                this.log.error("Exception caught while clean-closing task " + task2.id(), e);
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task2);
            }
        }
        return tasksToCloseDirty;
    }

    private Collection<Task> tryCloseCleanAllStandbyTasks(boolean clean, AtomicReference<RuntimeException> firstException) {
        if (!clean) {
            return this.standbyTaskIterable();
        }
        HashSet<Task> tasksToCloseDirty = new HashSet<Task>();
        for (Task task : this.standbyTaskIterable()) {
            try {
                task.prepareCommit();
                task.postCommit(true);
                task.suspend();
                this.completeTaskCloseClean(task);
            }
            catch (TaskMigratedException e) {
                tasksToCloseDirty.add(task);
            }
            catch (RuntimeException e) {
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task);
            }
        }
        return tasksToCloseDirty;
    }

    Set<TaskId> activeTaskIds() {
        return this.activeTaskStream().map(Task::id).collect(Collectors.toSet());
    }

    Set<TaskId> standbyTaskIds() {
        return this.standbyTaskStream().map(Task::id).collect(Collectors.toSet());
    }

    Map<TaskId, Task> tasks() {
        return this.tasks.tasksPerId();
    }

    Map<TaskId, Task> activeTaskMap() {
        return this.activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
    }

    List<Task> activeTaskIterable() {
        return this.activeTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> activeTaskStream() {
        return this.tasks.allTasks().stream().filter(Task::isActive);
    }

    Map<TaskId, Task> standbyTaskMap() {
        return this.standbyTaskStream().collect(Collectors.toMap(Task::id, t -> t));
    }

    private List<Task> standbyTaskIterable() {
        return this.standbyTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> standbyTaskStream() {
        return this.tasks.allTasks().stream().filter(t -> !t.isActive());
    }

    int commitAll() {
        return this.commit(new HashSet<Task>(this.tasks.allTasks()));
    }

    void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
        for (TopicPartition partition : records.partitions()) {
            Task activeTask = this.tasks.activeTasksForInputPartition(partition);
            if (activeTask == null) {
                this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", (Object)partition, (Object)this.toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
            }
            activeTask.addRecords(partition, records.records(partition));
        }
    }

    int commit(Collection<Task> tasksToCommit) {
        int committed = 0;
        HashMap<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<Task, Map<TopicPartition, OffsetAndMetadata>>();
        try {
            committed = this.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
        }
        catch (TimeoutException timeoutException) {
            consumedOffsetsAndMetadataPerTask.keySet().forEach(t -> t.maybeInitTaskTimeoutOrThrow(this.time.milliseconds(), timeoutException));
        }
        return committed;
    }

    private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(Collection<Task> tasksToCommit, Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
        if (this.rebalanceInProgress) {
            return -1;
        }
        int committed = 0;
        for (Task task : tasksToCommit) {
            if (!task.commitNeeded()) continue;
            Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
            if (!task.isActive()) continue;
            consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
        }
        this.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
        for (Task task : tasksToCommit) {
            if (!task.commitNeeded()) continue;
            task.clearTaskTimeout();
            ++committed;
            task.postCommit(false);
        }
        return committed;
    }

    int maybeCommitActiveTasksPerUserRequested() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        for (Task task : this.activeTaskIterable()) {
            if (!task.commitRequested() || !task.commitNeeded()) continue;
            return this.commit(this.activeTaskIterable());
        }
        return 0;
    }

    private void commitOffsetsOrTransaction(Map<Task, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
        this.log.debug("Committing task offsets {}", (Object)offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> ((Task)t.getKey()).id(), Map.Entry::getValue)));
        HashSet<TaskId> corruptedTasks = new HashSet<TaskId>();
        if (!offsetsPerTask.isEmpty()) {
            if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
                for (Map.Entry<Task, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
                    Task task2 = taskToCommit.getKey();
                    try {
                        this.tasks.streamsProducerForTask(task2.id()).commitTransaction(taskToCommit.getValue(), this.mainConsumer.groupMetadata());
                        this.updateTaskCommitMetadata(taskToCommit.getValue());
                    }
                    catch (TimeoutException timeoutException) {
                        this.log.error(String.format("Committing task %s failed.", task2.id()), timeoutException);
                        corruptedTasks.add(task2.id());
                    }
                }
            } else {
                Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_V2) {
                    try {
                        this.tasks.threadProducer().commitTransaction(allOffsets, this.mainConsumer.groupMetadata());
                        this.updateTaskCommitMetadata(allOffsets);
                    }
                    catch (TimeoutException timeoutException) {
                        this.log.error(String.format("Committing task(s) %s failed.", offsetsPerTask.keySet().stream().map(t -> t.id().toString()).collect(Collectors.joining(", "))), timeoutException);
                        offsetsPerTask.keySet().forEach(task -> corruptedTasks.add(task.id()));
                    }
                } else {
                    try {
                        this.mainConsumer.commitSync(allOffsets);
                        this.updateTaskCommitMetadata(allOffsets);
                    }
                    catch (CommitFailedException error) {
                        throw new TaskMigratedException("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group", error);
                    }
                    catch (TimeoutException timeoutException) {
                        this.log.error(String.format("Committing task(s) %s failed.", offsetsPerTask.keySet().stream().map(t -> t.id().toString()).collect(Collectors.joining(", "))), timeoutException);
                        throw timeoutException;
                    }
                    catch (KafkaException error) {
                        throw new StreamsException("Error encountered committing offsets via consumer", error);
                    }
                }
            }
            if (!corruptedTasks.isEmpty()) {
                throw new TaskCorruptedException(corruptedTasks);
            }
        }
    }

    private void updateTaskCommitMetadata(Map<TopicPartition, OffsetAndMetadata> allOffsets) {
        for (Task task : this.tasks.activeTasks()) {
            if (!(task instanceof StreamTask)) continue;
            for (TopicPartition topicPartition : task.inputPartitions()) {
                if (!allOffsets.containsKey(topicPartition)) continue;
                ((StreamTask)task).updateCommittedOffsets(topicPartition, allOffsets.get(topicPartition).offset());
            }
        }
    }

    public void updateTaskEndMetadata(TopicPartition topicPartition, Long offset) {
        for (Task task : this.tasks.activeTasks()) {
            if (!(task instanceof StreamTask) || !task.inputPartitions().contains(topicPartition)) continue;
            ((StreamTask)task).updateEndOffsets(topicPartition, offset);
        }
    }

    int process(int maxNumRecords, Time time) {
        int totalProcessed = 0;
        long now = time.milliseconds();
        for (Task task : this.activeTaskIterable()) {
            int processed;
            long then = now;
            try {
                for (processed = 0; processed < maxNumRecords && task.process(now); ++processed) {
                    task.clearTaskTimeout();
                }
            }
            catch (TimeoutException timeoutException) {
                task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
                this.log.debug(String.format("Could not complete processing records for %s due to the following exception; will move to next task and retry later", task.id()), timeoutException);
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
            finally {
                now = time.milliseconds();
                totalProcessed += processed;
                task.recordProcessBatchTime(now - then);
            }
        }
        return totalProcessed;
    }

    void recordTaskProcessRatio(long totalProcessLatencyMs, long now) {
        for (Task task : this.activeTaskIterable()) {
            task.recordProcessTimeRatioAndBufferSize(totalProcessLatencyMs, now);
        }
    }

    int punctuate() {
        int punctuated = 0;
        for (Task task : this.activeTaskIterable()) {
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }

    void maybePurgeCommittedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", (Object)this.deleteRecordsResult.lowWatermarks());
            }
            HashMap<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
            for (Task task : this.activeTaskIterable()) {
                for (Map.Entry<TopicPartition, Long> entry : task.purgeableOffsets().entrySet()) {
                    recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
                }
            }
            if (!recordsToDelete.isEmpty()) {
                this.deleteRecordsResult = this.adminClient.deleteRecords(recordsToDelete);
                this.log.trace("Sent delete-records request: {}", (Object)recordsToDelete);
            }
        }
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("TaskManager\n");
        stringBuilder.append(indent).append("\tMetadataState:\n");
        stringBuilder.append(indent).append("\tTasks:\n");
        for (Task task : this.tasks.allTasks()) {
            stringBuilder.append(indent).append("\t\t").append(task.id()).append(" ").append((Object)task.state()).append(" ").append(task.getClass().getSimpleName()).append('(').append(task.isActive() ? "active" : "standby").append(')');
        }
        return stringBuilder.toString();
    }

    Map<MetricName, Metric> producerMetrics() {
        return this.tasks.producerMetrics();
    }

    Set<String> producerClientIds() {
        return this.tasks.producerClientIds();
    }

    Set<TaskId> lockedTaskDirectories() {
        return Collections.unmodifiableSet(this.lockedTaskDirectories);
    }

    public static void executeAndMaybeSwallow(boolean clean, Runnable runnable, java.util.function.Consumer<RuntimeException> actionIfClean, java.util.function.Consumer<RuntimeException> actionIfNotClean) {
        try {
            runnable.run();
        }
        catch (RuntimeException e) {
            if (clean) {
                actionIfClean.accept(e);
            }
            actionIfNotClean.accept(e);
        }
    }

    public static void executeAndMaybeSwallow(boolean clean, Runnable runnable, String name, Logger log) {
        TaskManager.executeAndMaybeSwallow(clean, runnable, (RuntimeException e) -> {
            throw e;
        }, (RuntimeException e) -> log.debug("Ignoring error in unclean {}", (Object)name));
    }

    boolean needsInitializationOrRestoration() {
        return this.tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
    }

    void addTask(Task task) {
        this.tasks.addTask(task);
    }
}

