/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.Tasks;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class TaskManagerTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TaskId taskId00 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p0changelog = new TopicPartition("changelog", 0);
    private final Set<TopicPartition> taskId00Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0});
    private final Set<TopicPartition> taskId00ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
    private final TaskId taskId01 = new TaskId(0, 1);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 1);
    private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1);
    private final Set<TopicPartition> taskId01Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1});
    private final Set<TopicPartition> taskId01ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = Collections.singletonMap(this.taskId01, this.taskId01Partitions);
    private final TaskId taskId02 = new TaskId(0, 2);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p2changelog = new TopicPartition("changelog", 2);
    private final Set<TopicPartition> taskId02Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p2});
    private final Set<TopicPartition> taskId02ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p2changelog});
    private final TaskId taskId03 = new TaskId(0, 3);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t1p3changelog = new TopicPartition("changelog", 3);
    private final Set<TopicPartition> taskId03Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p3});
    private final Set<TopicPartition> taskId03ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p3changelog});
    private final TaskId taskId04 = new TaskId(0, 4);
    private final TopicPartition t1p4 = new TopicPartition("topic1", 4);
    private final TopicPartition t1p4changelog = new TopicPartition("changelog", 4);
    private final Set<TopicPartition> taskId04Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p4});
    private final Set<TopicPartition> taskId04ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p4changelog});
    private final TaskId taskId05 = new TaskId(0, 5);
    private final TopicPartition t1p5 = new TopicPartition("topic1", 5);
    private final TopicPartition t1p5changelog = new TopicPartition("changelog", 5);
    private final Set<TopicPartition> taskId05Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p5});
    private final Set<TopicPartition> taskId05ChangelogPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p5changelog});
    private final TaskId taskId10 = new TaskId(1, 0);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final Set<TopicPartition> taskId10Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t2p0});
    private final Set<TopicPartition> assignment = Collections.singleton(new TopicPartition("assignment", 0));
    final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = partitions -> {};
    @Mock
    private InternalTopologyBuilder topologyBuilder;
    @Mock
    private StateDirectory stateDirectory;
    @Mock
    private ChangelogReader changeLogReader;
    @Mock
    private Consumer<byte[], byte[]> consumer;
    @Mock
    private ActiveTaskCreator activeTaskCreator;
    @Mock
    private StandbyTaskCreator standbyTaskCreator;
    @Mock
    private Admin adminClient;
    @Mock
    private ProcessorStateManager stateManager;
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private ProcessorStateManager.StateStoreMetadata stateStore;
    final StateUpdater stateUpdater = (StateUpdater)Mockito.mock(StateUpdater.class);
    final DefaultTaskManager schedulingTaskManager = (DefaultTaskManager)Mockito.mock(DefaultTaskManager.class);
    private TaskManager taskManager;
    private TopologyMetadata topologyMetadata;
    private final Time time = new MockTime();
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();
    @Rule
    public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

    @Before
    public void setUp() {
        this.taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
    }

    private TaskManager setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode, boolean stateUpdaterEnabled) {
        return this.setUpTaskManager(processingMode, null, stateUpdaterEnabled, false);
    }

    private TaskManager setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode, TasksRegistry tasks, boolean stateUpdaterEnabled) {
        return this.setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, false);
    }

    private TaskManager setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode, TasksRegistry tasks, boolean stateUpdaterEnabled, boolean processingThreadsEnabled) {
        this.topologyMetadata = new TopologyMetadata(this.topologyBuilder, (StreamsConfig)new DummyStreamsConfig(processingMode));
        TaskManager taskManager = new TaskManager(this.time, this.changeLogReader, ProcessId.randomProcessId(), "taskManagerTest", this.activeTaskCreator, this.standbyTaskCreator, (TasksRegistry)(tasks != null ? tasks : new Tasks(new LogContext())), this.topologyMetadata, this.adminClient, this.stateDirectory, stateUpdaterEnabled ? this.stateUpdater : null, processingThreadsEnabled ? this.schedulingTaskManager : null);
        taskManager.setMainConsumer(this.consumer);
        return taskManager;
    }

    @Test
    public void shouldClassifyExistingTasksWithoutStateUpdater() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, false);
        Map runningActiveTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}))});
        Map standbyTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId02, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p2}))});
        Map restoringActiveTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p3}))});
        HashMap activeTasks = new HashMap(runningActiveTasks);
        activeTasks.putAll(restoringActiveTasks);
        this.handleAssignment(runningActiveTasks, standbyTasks, restoringActiveTasks);
        taskManager.handleAssignment(activeTasks, standbyTasks);
        Mockito.verifyNoInteractions((Object[])new Object[]{this.stateUpdater});
    }

    @Test
    public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater() {
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        this.updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater((Task)standbyTask, this.taskId03Partitions);
        ((StandbyTask)Mockito.verify((Object)standbyTask, (VerificationMode)Mockito.never())).updateInputPartitions((Set)ArgumentMatchers.eq(this.taskId03Partitions), (Map)ArgumentMatchers.any());
    }

    @Test
    public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater() {
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        this.updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater((Task)standbyTask, this.taskId04Partitions);
        ((StandbyTask)Mockito.verify((Object)standbyTask)).updateInputPartitions((Set)ArgumentMatchers.eq(this.taskId04Partitions), (Map)ArgumentMatchers.any());
    }

    private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(Task standbyTask, Set<TopicPartition> newInputPartition) {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTask}));
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTask.id(), newInputPartition)}));
        ((Task)Mockito.verify((Object)standbyTask)).resume();
    }

    @Test
    public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        Mockito.when((Object)tasks.activeTaskIds()).thenReturn((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        Mockito.when((Object)tasks.task(this.taskId00)).thenReturn((Object)activeTask1);
        KafkaFuture mockFuture = KafkaFuture.completedFuture(null);
        Mockito.when((Object)this.schedulingTaskManager.lockTasks((Set)ArgumentMatchers.any())).thenReturn((Object)mockFuture);
        taskManager.handleCorruption(Utils.mkSet((Object[])new TaskId[]{this.taskId00}));
        ((Consumer)Mockito.verify(this.consumer)).assignment();
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).lockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).unlockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
    }

    @Test
    public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        KafkaFuture mockFuture = KafkaFuture.completedFuture(null);
        Mockito.when((Object)this.schedulingTaskManager.lockTasks((Set)ArgumentMatchers.any())).thenReturn((Object)mockFuture);
        taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).lockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).unlockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
    }

    @Test
    public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        Mockito.when((Object)tasks.allTaskIds()).thenReturn((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        KafkaFuture mockFuture = KafkaFuture.completedFuture(null);
        Mockito.when((Object)this.schedulingTaskManager.lockTasks((Set)ArgumentMatchers.any())).thenReturn((Object)mockFuture);
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions)}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).lockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).unlockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
    }

    @Test
    public void shouldLockAffectedTasksOnHandleRevocation() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        Mockito.when((Object)tasks.allTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}));
        KafkaFuture mockFuture = KafkaFuture.completedFuture(null);
        Mockito.when((Object)this.schedulingTaskManager.lockTasks((Set)ArgumentMatchers.any())).thenReturn((Object)mockFuture);
        taskManager.handleRevocation(this.taskId01Partitions);
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).lockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).unlockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}));
    }

    @Test
    public void shouldLockTasksOnClose() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        Mockito.when((Object)tasks.allTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}));
        KafkaFuture mockFuture = KafkaFuture.completedFuture(null);
        Mockito.when((Object)this.schedulingTaskManager.lockTasks((Set)ArgumentMatchers.any())).thenReturn((Object)mockFuture);
        taskManager.closeAndCleanUpTasks((Collection)Utils.mkSet((Object[])new Task[]{activeTask1}), (Collection)Utils.mkSet((Object[])new Task[0]), false);
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).lockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00}));
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).unlockTasks(Utils.mkSet((Object[])new TaskId[]{this.taskId00}));
    }

    @Test
    public void shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.activeTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}));
        taskManager.resumePollingForPartitionsWithAvailableSpace();
        ((StreamTask)Mockito.verify((Object)activeTask1)).resumePollingForPartitionsWithAvailableSpace();
        ((StreamTask)Mockito.verify((Object)activeTask2)).resumePollingForPartitionsWithAvailableSpace();
    }

    @Test
    public void shouldUpdateLagForAllActiveTasks() {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.activeTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}));
        taskManager.updateLags();
        ((StreamTask)Mockito.verify((Object)activeTask1)).updateLags();
        ((StreamTask)Mockito.verify((Object)activeTask2)).updateLags();
    }

    @Test
    public void shouldRemoveUnusedActiveTaskFromStateUpdaterAndCloseCleanly() {
        StreamTask activeTaskToClose = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToClose}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToClose));
        taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).suspend();
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).closeClean();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveUnusedFailedActiveTaskFromStateUpdaterAndCloseDirty() {
        StreamTask activeTaskToClose = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToClose}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToClose, new RuntimeException("KABOOM!")));
        taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).suspend();
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveUnusedStandbyTaskFromStateUpdaterAndCloseCleanly() {
        StandbyTask standbyTaskToClose = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToClose}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)standbyTaskToClose));
        taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((StandbyTask)Mockito.verify((Object)standbyTaskToClose)).suspend();
        ((StandbyTask)Mockito.verify((Object)standbyTaskToClose)).closeClean();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveUnusedFailedStandbyTaskFromStateUpdaterAndCloseDirty() {
        StandbyTask standbyTaskToClose = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToClose}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)standbyTaskToClose, new RuntimeException("KABOOM!")));
        taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((StandbyTask)Mockito.verify((Object)standbyTaskToClose)).prepareCommit();
        ((StandbyTask)Mockito.verify((Object)standbyTaskToClose)).suspend();
        ((StandbyTask)Mockito.verify((Object)standbyTaskToClose)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldCollectFailedTaskFromStateUpdaterAndRethrow() {
        StandbyTask failedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{failedStandbyTask}));
        CompletableFuture future = new CompletableFuture();
        Mockito.when((Object)this.stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
        RuntimeException kaboom = new RuntimeException("KABOOM!");
        future.completeExceptionally(kaboom);
        Mockito.when((Object)this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(new StateUpdater.ExceptionAndTask(new RuntimeException("KABOOM!"), (Task)failedStandbyTask)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals((Object)("Encounter unexpected fatal error for task " + failedStandbyTask.id()), (Object)exception.getMessage());
        Assertions.assertInstanceOf(RuntimeException.class, (Object)exception.getCause());
        Assert.assertEquals((Object)kaboom.getMessage(), (Object)exception.getCause().getMessage());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)failedStandbyTask);
    }

    @Test
    public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() {
        StreamTask activeTaskToUpdateInputPartitions = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        Set<TopicPartition> newInputPartitions = this.taskId02Partitions;
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToUpdateInputPartitions}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToUpdateInputPartitions));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToUpdateInputPartitions.id(), newInputPartitions)}), Collections.emptyMap());
        InOrder updateInputPartitionsThenAddBack = Mockito.inOrder((Object[])new Object[]{this.stateUpdater, activeTaskToUpdateInputPartitions});
        ((StreamTask)updateInputPartitionsThenAddBack.verify((Object)activeTaskToUpdateInputPartitions)).updateInputPartitions((Set)ArgumentMatchers.eq(newInputPartitions), (Map)ArgumentMatchers.any());
        ((StateUpdater)updateInputPartitionsThenAddBack.verify((Object)this.stateUpdater)).add((Task)activeTaskToUpdateInputPartitions);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRecycleActiveTaskInStateUpdater() {
        StreamTask activeTaskToRecycle = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask recycledStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToRecycle}));
        Mockito.when((Object)this.standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, this.taskId03Partitions)).thenReturn((Object)recycledStandbyTask);
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(this.taskId03)).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToRecycle));
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToRecycle.id(), (Object)activeTaskToRecycle.inputPartitions())}));
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit(Collections.singleton(recycledStandbyTask));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldHandleExceptionThrownDuringRecyclingActiveTask() {
        StreamTask activeTaskToRecycle = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToRecycle}));
        Mockito.when((Object)this.standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions())).thenThrow(new Throwable[]{new RuntimeException()});
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(activeTaskToRecycle.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToRecycle));
        Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToRecycle.id(), (Object)activeTaskToRecycle.inputPartitions())})));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).add((Task)ArgumentMatchers.any());
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addPendingTasksToInit(Collections.singleton(ArgumentMatchers.any()));
        ((StreamTask)Mockito.verify((Object)activeTaskToRecycle)).closeDirty();
    }

    @Test
    public void shouldRecycleStandbyTaskInStateUpdater() {
        StandbyTask standbyTaskToRecycle = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        StreamTask recycledActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToRecycle}));
        Mockito.when((Object)this.activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, this.taskId03Partitions, this.consumer)).thenReturn((Object)recycledActiveTask);
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)standbyTaskToRecycle));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToRecycle.id(), (Object)standbyTaskToRecycle.inputPartitions())}), Collections.emptyMap());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldHandleExceptionThrownDuringRecyclingStandbyTask() {
        StandbyTask standbyTaskToRecycle = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToRecycle}));
        Mockito.when((Object)this.activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, standbyTaskToRecycle.inputPartitions(), this.consumer)).thenThrow(new Throwable[]{new RuntimeException()});
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)standbyTaskToRecycle));
        Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToRecycle.id(), (Object)standbyTaskToRecycle.inputPartitions())}), Collections.emptyMap()));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).add((Task)ArgumentMatchers.any());
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addPendingTasksToInit(Collections.singleton(ArgumentMatchers.any()));
        ((StandbyTask)Mockito.verify((Object)standbyTaskToRecycle)).closeDirty();
    }

    @Test
    public void shouldKeepReassignedActiveTaskInStateUpdater() {
        StreamTask reassignedActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{reassignedActiveTask}));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)reassignedActiveTask.id(), (Object)reassignedActiveTask.inputPartitions())}), Collections.emptyMap());
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).remove(reassignedActiveTask.id());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldMoveReassignedSuspendedActiveTaskToStateUpdater() {
        StreamTask reassignedActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.SUSPENDED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{reassignedActiveTask}));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)reassignedActiveTask.id(), (Object)reassignedActiveTask.inputPartitions())}), Collections.emptyMap());
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)reassignedActiveTask);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)reassignedActiveTask);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() {
        StreamTask failedActiveTaskToRecycle = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{failedActiveTaskToRecycle}));
        RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
        Mockito.when((Object)this.stateUpdater.remove(failedActiveTaskToRecycle.id())).thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult((Task)failedActiveTaskToRecycle, taskException)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)failedActiveTaskToRecycle.id(), (Object)failedActiveTaskToRecycle.inputPartitions())})));
        Assert.assertEquals((Object)("Encounter unexpected fatal error for task " + failedActiveTaskToRecycle.id()), (Object)exception.getMessage());
        Assert.assertEquals((Object)taskException, (Object)exception.getCause());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)failedActiveTaskToRecycle);
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)failedActiveTaskToRecycle);
        ((TasksRegistry)Mockito.verify((Object)tasks)).allNonFailedTasks();
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator, (VerificationMode)Mockito.never())).createStandbyTaskFromActive(failedActiveTaskToRecycle, this.taskId03Partitions);
    }

    @Test
    public void shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() {
        StandbyTask failedStandbyTaskToRecycle = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{failedStandbyTaskToRecycle}));
        RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
        Mockito.when((Object)this.stateUpdater.remove(failedStandbyTaskToRecycle.id())).thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult((Task)failedStandbyTaskToRecycle, taskException)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)failedStandbyTaskToRecycle.id(), (Object)failedStandbyTaskToRecycle.inputPartitions())}), Collections.emptyMap()));
        Assert.assertEquals((Object)("Encounter unexpected fatal error for task " + failedStandbyTaskToRecycle.id()), (Object)exception.getMessage());
        Assert.assertEquals((Object)taskException, (Object)exception.getCause());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)failedStandbyTaskToRecycle);
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)failedStandbyTaskToRecycle);
        ((TasksRegistry)Mockito.verify((Object)tasks)).allNonFailedTasks();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.never())).createActiveTaskFromStandby(failedStandbyTaskToRecycle, this.taskId03Partitions, this.consumer);
    }

    @Test
    public void shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuringAssignmentToTaskRegistry() {
        StreamTask failedActiveTaskToReassign = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{failedActiveTaskToReassign}));
        RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
        Mockito.when((Object)this.stateUpdater.remove(failedActiveTaskToReassign.id())).thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult((Task)failedActiveTaskToReassign, taskException)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)failedActiveTaskToReassign.id(), this.taskId00Partitions)}), Collections.emptyMap()));
        Assert.assertEquals((Object)("Encounter unexpected fatal error for task " + failedActiveTaskToReassign.id()), (Object)exception.getMessage());
        Assert.assertEquals((Object)taskException, (Object)exception.getCause());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)failedActiveTaskToReassign);
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)failedActiveTaskToReassign);
        ((TasksRegistry)Mockito.verify((Object)tasks)).allNonFailedTasks();
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).updateActiveTaskInputPartitions((Task)failedActiveTaskToReassign, this.taskId00Partitions);
    }

    @Test
    public void shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
        StreamTask reassignedActiveTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.SUSPENDED).withInputPartitions(this.taskId03Partitions).build();
        StreamTask reassignedActiveTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{reassignedActiveTask1}));
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{reassignedActiveTask2}));
        Mockito.when((Object)this.stateUpdater.remove(reassignedActiveTask2.id())).thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult((Task)reassignedActiveTask2)));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)reassignedActiveTask1.id(), (Object)reassignedActiveTask1.inputPartitions()), Utils.mkEntry((Object)reassignedActiveTask2.id(), this.taskId00Partitions)}), Collections.emptyMap());
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.stateUpdater, tasks});
        ((StateUpdater)inOrder.verify((Object)this.stateUpdater)).remove(reassignedActiveTask2.id());
        ((TasksRegistry)inOrder.verify((Object)tasks)).removeTask((Task)reassignedActiveTask1);
        ((StateUpdater)inOrder.verify((Object)this.stateUpdater)).add((Task)reassignedActiveTask1);
    }

    @Test
    public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
        StandbyTask standbyTaskToUpdateInputPartitions = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToUpdateInputPartitions}));
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToUpdateInputPartitions.id(), this.taskId03Partitions)}));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).remove(standbyTaskToUpdateInputPartitions.id());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldKeepReassignedStandbyTaskInStateUpdater() {
        StandbyTask reassignedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{reassignedStandbyTask}));
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)reassignedStandbyTask.id(), (Object)reassignedStandbyTask.inputPartitions())}));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAssignMultipleTasksInStateUpdater() {
        StreamTask activeTaskToClose = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask standbyTaskToRecycle = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        StreamTask recycledActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToClose, standbyTaskToRecycle}));
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForActiveTaskToClose = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
        futureForActiveTaskToClose.complete(new StateUpdater.RemovedTaskResult((Task)activeTaskToClose));
        Mockito.when((Object)this.activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, this.taskId02Partitions, this.consumer)).thenReturn((Object)recycledActiveTask);
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForStandbyTaskToRecycle = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(futureForStandbyTaskToRecycle);
        futureForStandbyTaskToRecycle.complete(new StateUpdater.RemovedTaskResult((Task)standbyTaskToRecycle));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToRecycle.id(), (Object)standbyTaskToRecycle.inputPartitions())}), Collections.emptyMap());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).suspend();
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).closeClean();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
    }

    @Test
    public void shouldReturnRunningTasksStateUpdaterTasksAndTasksToInitInAllTasks() {
        StreamTask activeTaskToInit = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        StreamTask runningActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask standbyTaskInStateUpdater = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskInStateUpdater}));
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)runningActiveTask)}));
        Mockito.when((Object)tasks.pendingTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToInit}));
        Assert.assertEquals((Object)taskManager.allTasks(), (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)runningActiveTask), Utils.mkEntry((Object)this.taskId02, (Object)standbyTaskInStateUpdater), Utils.mkEntry((Object)this.taskId01, (Object)activeTaskToInit)}));
    }

    @Test
    public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)activeTask)}));
        Assert.assertEquals((Object)taskManager.allOwnedTasks(), (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)activeTask)}));
    }

    @Test
    public void shouldCreateActiveTaskDuringAssignment() {
        StreamTask activeTaskToBeCreated = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Set createdTasks = Utils.mkSet((Object[])new Task[]{activeTaskToBeCreated});
        Map tasksToBeCreated = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToBeCreated.id(), (Object)activeTaskToBeCreated.inputPartitions())});
        Mockito.when((Object)this.activeTaskCreator.createTasks(this.consumer, tasksToBeCreated)).thenReturn((Object)createdTasks);
        taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)createdTasks);
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldCreateStandbyTaskDuringAssignment() {
        StandbyTask standbyTaskToBeCreated = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Set createdTasks = Utils.mkSet((Object[])new Task[]{standbyTaskToBeCreated});
        Mockito.when((Object)this.standbyTaskCreator.createTasks(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToBeCreated.id(), (Object)standbyTaskToBeCreated.inputPartitions())}))).thenReturn((Object)createdTasks);
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToBeCreated.id(), (Object)standbyTaskToBeCreated.inputPartitions())}));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)createdTasks);
    }

    @Test
    public void shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
        StreamTask activeTaskToRecycle = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.CREATED).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToRecycle}));
        Mockito.when((Object)this.standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, this.taskId01Partitions)).thenReturn((Object)standbyTask);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions)}));
        ((StreamTask)Mockito.verify((Object)activeTaskToRecycle)).prepareCommit();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)Utils.mkSet((Object[])new Task[]{standbyTask}));
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)activeTaskToRecycle);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
        StreamTask activeTaskToRecycle = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.CREATED).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToRecycle}));
        Mockito.when((Object)this.standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, this.taskId01Partitions)).thenReturn((Object)standbyTask);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
        taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions)}));
        ((StreamTask)Mockito.verify((Object)activeTaskToRecycle)).prepareCommit();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
        ((TasksRegistry)Mockito.verify((Object)tasks)).replaceActiveWithStandby(standbyTask);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        StandbyTask standbyTaskToRecycle = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToRecycle}));
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        IllegalStateException illegalStateException = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToRecycle.id(), (Object)standbyTaskToRecycle.inputPartitions())}), Collections.emptyMap()));
        Assert.assertEquals((Object)illegalStateException.getMessage(), (Object)("Standby tasks should only be managed by the state updater, but standby task " + this.taskId03 + " is managed by the stream thread"));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled() {
        StreamTask activeTaskToClose = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToClose}));
        taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).closeClean();
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)activeTaskToClose);
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        StandbyTask standbyTaskToClose = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToClose}));
        IllegalStateException illegalStateException = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals((Object)illegalStateException.getMessage(), (Object)("Standby tasks should only be managed by the state updater, but standby task " + this.taskId03 + " is managed by the stream thread"));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled() {
        StreamTask activeTaskToUpdateInputPartitions = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        Set<TopicPartition> newInputPartitions = this.taskId02Partitions;
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToUpdateInputPartitions}));
        Mockito.when((Object)tasks.updateActiveTaskInputPartitions((Task)activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn((Object)true);
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToUpdateInputPartitions.id(), newInputPartitions)}), Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StreamTask)Mockito.verify((Object)activeTaskToUpdateInputPartitions)).updateInputPartitions((Set)ArgumentMatchers.eq(newInputPartitions), (Map)ArgumentMatchers.any());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() {
        StreamTask activeTaskToResume = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToResume}));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToResume.id(), (Object)activeTaskToResume.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() {
        StreamTask activeTaskToResume = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.SUSPENDED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToResume}));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToResume.id(), (Object)activeTaskToResume.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StreamTask)Mockito.verify((Object)activeTaskToResume)).resume();
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)activeTaskToResume);
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)activeTaskToResume);
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        StandbyTask standbyTaskToUpdateInputPartitions = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        Set<TopicPartition> newInputPartitions = this.taskId03Partitions;
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTaskToUpdateInputPartitions}));
        IllegalStateException illegalStateException = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> taskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)standbyTaskToUpdateInputPartitions.id(), (Object)newInputPartitions)})));
        Assert.assertEquals((Object)illegalStateException.getMessage(), (Object)("Standby tasks should only be managed by the state updater, but standby task " + this.taskId02 + " is managed by the stream thread"));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
        StreamTask activeTaskToClose = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        StreamTask activeTaskToCreate = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{activeTaskToClose}));
        taskManager.handleAssignment(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToCreate.id(), (Object)activeTaskToCreate.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks(this.consumer, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTaskToCreate.id(), (Object)activeTaskToCreate.inputPartitions())}));
        ((StreamTask)Mockito.verify((Object)activeTaskToClose)).closeClean();
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAddTasksToStateUpdater() {
        StreamTask task00 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        StandbyTask task01 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.drainPendingTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task00, task01}));
        this.taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask)Mockito.verify((Object)task00)).initializeIfNeeded();
        ((StandbyTask)Mockito.verify((Object)task01)).initializeIfNeeded();
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)task00);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)task01);
    }

    @Test
    public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
        StreamTask task00 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        StandbyTask task01 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.drainPendingTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task00, task01}));
        LockException lockException = new LockException("Where are my keys??");
        ((StreamTask)Mockito.doThrow((Throwable[])new Throwable[]{lockException}).when((Object)task00)).initializeIfNeeded();
        this.taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask)Mockito.verify((Object)task00)).initializeIfNeeded();
        ((StandbyTask)Mockito.verify((Object)task01)).initializeIfNeeded();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)ArgumentMatchers.argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01)));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).add((Task)task00);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)task01);
    }

    @Test
    public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
        StreamTask task00 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.CREATED).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.drainPendingTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task00}));
        RuntimeException runtimeException = new RuntimeException("KABOOM!");
        ((StreamTask)Mockito.doThrow((Throwable[])new Throwable[]{runtimeException}).when((Object)task00)).initializeIfNeeded();
        this.taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        StreamsException streamsException = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).add((Task)task00);
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)task00);
        Assert.assertTrue((boolean)streamsException.taskId().isPresent());
        Assert.assertEquals((Object)task00.id(), streamsException.taskId().get());
        Assert.assertEquals((Object)"Encounter unexpected fatal error for task 0_0", (Object)streamsException.getMessage());
        Assert.assertEquals((Object)runtimeException, (Object)streamsException.getCause());
    }

    @Test
    public void shouldRethrowTaskCorruptedExceptionFromInitialization() {
        StreamTask statefulTask0 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        StreamTask statefulTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId01Partitions).build();
        StreamTask statefulTask2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
        Mockito.when((Object)tasks.drainPendingTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{statefulTask0, statefulTask1, statefulTask2}));
        ((StreamTask)Mockito.doThrow((Throwable[])new Throwable[]{new TaskCorruptedException(Collections.singleton(statefulTask0.id))}).when((Object)statefulTask0)).initializeIfNeeded();
        ((StreamTask)Mockito.doThrow((Throwable[])new Throwable[]{new TaskCorruptedException(Collections.singleton(statefulTask1.id))}).when((Object)statefulTask1)).initializeIfNeeded();
        TaskCorruptedException thrown = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)statefulTask0);
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)statefulTask1);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)statefulTask2);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}), (Object)thrown.corruptedTasks());
        Assert.assertEquals((Object)"Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", (Object)thrown.getMessage());
    }

    @Test
    public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
        Mockito.when((Object)this.stateUpdater.restoresActiveTasks()).thenReturn((Object)true);
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Assert.assertFalse((boolean)taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.hasPendingTasksToInit()).thenReturn((Object)true);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Assert.assertFalse((boolean)taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Assert.assertTrue((boolean)taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldSuspendActiveTaskWithRevokedInputPartitionsInStateUpdater() {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task}), tasks);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task}));
        CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task.id())).thenReturn(future);
        future.complete(new StateUpdater.RemovedTaskResult((Task)task));
        taskManager.handleRevocation((Collection)task.inputPartitions());
        ((StreamTask)Mockito.verify((Object)task)).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)task);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).remove(task.id());
    }

    @Test
    public void shouldSuspendMultipleActiveTasksWithRevokedInputPartitionsInStateUpdater() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task1, task2}), tasks);
        CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task1.id())).thenReturn(future1);
        future1.complete(new StateUpdater.RemovedTaskResult((Task)task1));
        CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task2.id())).thenReturn(future2);
        future2.complete(new StateUpdater.RemovedTaskResult((Task)task2));
        taskManager.handleRevocation((Collection)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)task1);
        ((StreamTask)Mockito.verify((Object)task2)).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)task2);
    }

    @Test
    public void shouldNotSuspendActiveTaskWithoutRevokedInputPartitionsInStateUpdater() {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task}), tasks);
        taskManager.handleRevocation(this.taskId01Partitions);
        ((StreamTask)Mockito.verify((Object)task, (VerificationMode)Mockito.never())).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)task);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).remove(task.id());
    }

    @Test
    public void shouldNotRevokeStandbyTaskInStateUpdaterOnRevocation() {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task}), tasks);
        taskManager.handleRevocation(this.taskId00Partitions);
        ((StandbyTask)Mockito.verify((Object)task, (VerificationMode)Mockito.never())).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)task);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).remove(task.id());
    }

    @Test
    public void shouldThrowIfRevokingTasksInStateUpdaterFindsFailedTasks() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task1, task2}), tasks);
        CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task1.id())).thenReturn(future1);
        future1.complete(new StateUpdater.RemovedTaskResult((Task)task1));
        CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task2.id())).thenReturn(future2);
        RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
        future2.complete(new StateUpdater.RemovedTaskResult((Task)task2, taskException));
        StreamsException thrownException = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.handleRevocation((Collection)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions})));
        Assert.assertEquals((Object)("Encounter unexpected fatal error for task " + task2.id()), (Object)thrownException.getMessage());
        Assert.assertEquals((Object)thrownException.getCause(), (Object)taskException);
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)task1);
        ((StreamTask)Mockito.verify((Object)task2, (VerificationMode)Mockito.never())).suspend();
        ((TasksRegistry)Mockito.verify((Object)tasks)).addFailedTask((Task)task2);
    }

    @Test
    public void shouldCloseCleanWhenRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task1, task2, task3}), tasks);
        CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task1.id())).thenReturn(future1);
        future1.complete(new StateUpdater.RemovedTaskResult((Task)task1));
        CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task3.id())).thenReturn(future3);
        future3.complete(new StateUpdater.RemovedTaskResult((Task)task3));
        taskManager.handleLostAll();
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((StreamTask)Mockito.verify((Object)task1)).closeClean();
        ((StreamTask)Mockito.verify((Object)task3)).suspend();
        ((StreamTask)Mockito.verify((Object)task3)).closeClean();
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater, (VerificationMode)Mockito.never())).remove(task2.id());
    }

    @Test
    public void shouldCloseCleanTasksPendingInitOnPartitionLost() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.drainPendingActiveTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task1, task2}));
        TaskManager taskManager = this.setupForRevocationAndLost(Collections.emptySet(), tasks);
        taskManager.handleLostAll();
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((StreamTask)Mockito.verify((Object)task1)).closeClean();
        ((StreamTask)Mockito.verify((Object)task2)).suspend();
        ((StreamTask)Mockito.verify((Object)task2)).closeClean();
    }

    @Test
    public void shouldCloseDirtyWhenRemoveFailedActiveTasksFromStateUpdaterOnPartitionLost() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task1, task2}), tasks);
        CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task1.id())).thenReturn(future1);
        future1.complete(new StateUpdater.RemovedTaskResult((Task)task1, (RuntimeException)((Object)new StreamsException("Something happened"))));
        CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task2.id())).thenReturn(future3);
        future3.complete(new StateUpdater.RemovedTaskResult((Task)task2, (RuntimeException)((Object)new StreamsException("Something else happened"))));
        taskManager.handleLostAll();
        ((StreamTask)Mockito.verify((Object)task1)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((StreamTask)Mockito.verify((Object)task1)).closeDirty();
        ((StreamTask)Mockito.verify((Object)task2)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)task2)).suspend();
        ((StreamTask)Mockito.verify((Object)task2)).closeDirty();
    }

    @Test
    public void shouldCloseTasksWhenRemoveFailedActiveTasksFromStateUpdaterOnPartitionLost() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.drainPendingActiveTasksToInit()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task1}));
        TaskManager taskManager = this.setupForRevocationAndLost(Utils.mkSet((Object[])new Task[]{task2, task3}), tasks);
        CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task2.id())).thenReturn(future2);
        future2.complete(new StateUpdater.RemovedTaskResult((Task)task2, (RuntimeException)((Object)new StreamsException("Something happened"))));
        CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        Mockito.when((Object)this.stateUpdater.remove(task3.id())).thenReturn(future3);
        future3.complete(new StateUpdater.RemovedTaskResult((Task)task3));
        taskManager.handleLostAll();
        ((StreamTask)Mockito.verify((Object)task1)).suspend();
        ((StreamTask)Mockito.verify((Object)task1)).closeClean();
        ((StreamTask)Mockito.verify((Object)task2)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)task2)).suspend();
        ((StreamTask)Mockito.verify((Object)task2)).closeDirty();
        ((StreamTask)Mockito.verify((Object)task3)).suspend();
        ((StreamTask)Mockito.verify((Object)task3)).closeClean();
    }

    private TaskManager setupForRevocationAndLost(Set<Task> tasksInStateUpdater, TasksRegistry tasks) {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
        return taskManager;
    }

    @Test
    public void shouldTransitRestoredTaskToRunning() {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTransitionToRunningOfRestoredTask(Utils.mkSet((Object[])new StreamTask[]{task}), tasks);
        taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        this.verifyTransitionToRunningOfRestoredTask(Utils.mkSet((Object[])new StreamTask[]{task}), tasks);
    }

    @Test
    public void shouldTransitMultipleRestoredTasksToRunning() {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTransitionToRunningOfRestoredTask(Utils.mkSet((Object[])new StreamTask[]{task1, task2}), tasks);
        taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        this.verifyTransitionToRunningOfRestoredTask(Utils.mkSet((Object[])new StreamTask[]{task1, task2}), tasks);
    }

    private void verifyTransitionToRunningOfRestoredTask(Set<StreamTask> restoredTasks, TasksRegistry tasks) {
        for (StreamTask restoredTask : restoredTasks) {
            ((StreamTask)Mockito.verify((Object)restoredTask)).completeRestoration(this.noOpResetter);
            ((StreamTask)Mockito.verify((Object)restoredTask)).clearTaskTimeout();
            ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)restoredTask);
            ((Consumer)Mockito.verify(this.consumer)).resume((Collection)restoredTask.inputPartitions());
        }
    }

    @Test
    public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTransitionToRunningOfRestoredTask(Utils.mkSet((Object[])new StreamTask[]{task}), tasks);
        TimeoutException timeoutException = new TimeoutException();
        ((StreamTask)Mockito.doThrow((Throwable[])new Throwable[]{timeoutException}).when((Object)task)).completeRestoration(this.noOpResetter);
        taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask)Mockito.verify((Object)task)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception)ArgumentMatchers.eq((Object)((Object)timeoutException)));
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).add((Task)task);
        ((TasksRegistry)Mockito.verify((Object)tasks, (VerificationMode)Mockito.never())).addTask((Task)task);
        ((StreamTask)Mockito.verify((Object)task, (VerificationMode)Mockito.never())).clearTaskTimeout();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    private TaskManager setUpTransitionToRunningOfRestoredTask(Set<StreamTask> statefulTasks, TasksRegistry tasks) {
        Mockito.when((Object)this.stateUpdater.restoresActiveTasks()).thenReturn((Object)true);
        Mockito.when((Object)this.stateUpdater.drainRestoredActiveTasks((Duration)ArgumentMatchers.any(Duration.class))).thenReturn(statefulTasks);
        return this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
    }

    @Test
    public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
        Mockito.when((Object)this.stateUpdater.restoresActiveTasks()).thenReturn((Object)false);
        Assert.assertTrue((boolean)taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        Mockito.when((Object)this.stateUpdater.restoresActiveTasks()).thenReturn((Object)true);
        Assert.assertFalse((boolean)taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldRethrowStreamsExceptionFromStateUpdater() {
        StreamTask statefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamsException exception = new StreamsException("boom!");
        StateUpdater.ExceptionAndTask exceptionAndTasks = new StateUpdater.ExceptionAndTask((RuntimeException)((Object)exception), (Task)statefulTask);
        Mockito.when((Object)this.stateUpdater.hasExceptionsAndFailedTasks()).thenReturn((Object)true);
        Mockito.when((Object)this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        Assert.assertEquals((Object)((Object)exception), (Object)((Object)thrown));
        Assert.assertEquals((Object)statefulTask.id(), thrown.taskId().get());
    }

    @Test
    public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
        StreamTask statefulTask0 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask statefulTask1 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        StateUpdater.ExceptionAndTask exceptionAndTasks0 = new StateUpdater.ExceptionAndTask((RuntimeException)new TaskCorruptedException(Collections.singleton(this.taskId00)), (Task)statefulTask0);
        StateUpdater.ExceptionAndTask exceptionAndTasks1 = new StateUpdater.ExceptionAndTask((RuntimeException)new TaskCorruptedException(Collections.singleton(this.taskId01)), (Task)statefulTask1);
        Mockito.when((Object)this.stateUpdater.hasExceptionsAndFailedTasks()).thenReturn((Object)true);
        Mockito.when((Object)this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        TaskCorruptedException thrown = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01}), (Object)thrown.corruptedTasks());
        Assert.assertEquals((Object)"Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", (Object)thrown.getMessage());
    }

    public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
        Map activeTasksAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId01, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1})), Utils.mkEntry((Object)this.taskId02, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}))});
        Map standbyTasksAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p3})), Utils.mkEntry((Object)this.taskId04, (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p4}))});
        Mockito.when((Object)this.standbyTaskCreator.createTasks(standbyTasksAssignment)).thenReturn(Collections.emptySet());
        this.taskManager.handleAssignment(activeTasksAssignment, standbyTasksAssignment);
        ((InternalTopologyBuilder)Mockito.verify((Object)this.topologyBuilder)).addSubscribedTopicsFromAssignment((Set)ArgumentMatchers.eq((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t1p2, this.t2p2})), ArgumentMatchers.anyString());
        ((InternalTopologyBuilder)Mockito.verify((Object)this.topologyBuilder, (VerificationMode)Mockito.never())).addSubscribedTopicsFromAssignment((Set)ArgumentMatchers.eq((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p3, this.t1p4})), ArgumentMatchers.anyString());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)activeTasksAssignment));
    }

    @Test
    public void shouldNotLockAnythingIfStateDirIsEmpty() {
        Mockito.when((Object)this.stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue((boolean)this.taskManager.lockedTaskDirectories().isEmpty());
    }

    @Test
    public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
        this.expectLockObtainedFor(this.taskId01);
        this.expectLockFailedFor(this.taskId10);
        this.expectDirectoryNotEmpty(this.taskId01);
        this.makeTaskFolders(this.taskId01.toString(), this.taskId10.toString(), "dummy");
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is(Collections.singleton(this.taskId01)));
    }

    @Test
    public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
        this.expectLockObtainedFor(this.taskId01, this.taskId10);
        this.expectDirectoryNotEmpty(this.taskId01);
        Mockito.when((Object)this.stateDirectory.directoryForTaskIsEmpty(this.taskId10)).thenReturn((Object)true);
        this.makeTaskFolders(this.taskId01.toString(), this.taskId10.toString());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        ((StateDirectory)Mockito.verify((Object)this.stateDirectory)).unlock(this.taskId10);
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is(Collections.singleton(this.taskId01)));
    }

    @Test
    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
        Set assigned = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1});
        Mockito.when((Object)this.consumer.assignment()).thenReturn((Object)assigned);
        this.taskManager.handleRebalanceComplete();
        ((Consumer)Mockito.verify(this.consumer)).pause((Collection)assigned);
    }

    @Test
    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
        StreamTask statefulTask0 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{statefulTask0}));
        Set assigned = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1});
        Mockito.when((Object)this.consumer.assignment()).thenReturn((Object)assigned);
        taskManager.handleRebalanceComplete();
        ((Consumer)Mockito.verify(this.consumer)).pause((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1}));
    }

    @Test
    public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
        this.expectLockObtainedFor(this.taskId00, this.taskId01, this.taskId02);
        this.expectDirectoryNotEmpty(this.taskId00, this.taskId01, this.taskId02);
        this.makeTaskFolders(this.taskId00.toString(), this.taskId01.toString(), this.taskId02.toString());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01, this.taskId02})));
        this.handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        ((StateDirectory)Mockito.verify((Object)this.stateDirectory)).unlock(this.taskId02);
        ((Consumer)Mockito.verify(this.consumer)).pause(this.assignment);
    }

    @Test
    public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() throws Exception {
        StreamTask runningStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask restoringStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        StandbyTask unassignedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)runningStatefulTask)}));
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{standbyTask, restoringStatefulTask}));
        Mockito.when((Object)tasks.allNonFailedTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{runningStatefulTask}));
        this.expectLockObtainedFor(this.taskId00, this.taskId01, this.taskId02, this.taskId03);
        this.expectDirectoryNotEmpty(this.taskId00, this.taskId01, this.taskId02, this.taskId03);
        this.makeTaskFolders(this.taskId00.toString(), this.taskId01.toString(), this.taskId02.toString(), this.taskId03.toString());
        Set assigned = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        Mockito.when((Object)this.consumer.assignment()).thenReturn((Object)assigned);
        taskManager.handleRebalanceStart(Collections.singleton("topic"));
        taskManager.handleRebalanceComplete();
        ((Consumer)Mockito.verify(this.consumer)).pause((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t1p2}));
        ((StateDirectory)Mockito.verify((Object)this.stateDirectory)).unlock(this.taskId03);
        MatcherAssert.assertThat((Object)taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01, this.taskId02})));
    }

    @Test
    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)-2L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)-2L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)-2L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    @Test
    public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    @Test
    public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception {
        StreamTask restoringStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).build();
        long changelogOffset = 42L;
        Mockito.when((Object)restoringStatefulTask.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p0changelog, (Object)42L)}));
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        Map changelogOffsetInCheckpoint = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p0changelog, (Object)24L)});
        this.writeCheckpointFile(this.taskId00, changelogOffsetInCheckpoint);
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{restoringStatefulTask}));
        taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)42L)})));
    }

    @Test
    public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception {
        StandbyTask restoringStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).build();
        long changelogOffset = 42L;
        Mockito.when((Object)restoringStandbyTask.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p0changelog, (Object)42L)}));
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        Map changelogOffsetInCheckpoint = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p0changelog, (Object)24L)});
        this.writeCheckpointFile(this.taskId00, changelogOffsetInCheckpoint);
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{restoringStandbyTask}));
        taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)42L)})));
    }

    @Test
    public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
        StreamTask runningStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).build();
        StreamTask restoringStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        StandbyTask restoringStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build();
        long changelogOffsetOfRunningTask = 42L;
        long changelogOffsetOfRestoringStatefulTask = 24L;
        long changelogOffsetOfRestoringStandbyTask = 84L;
        Mockito.when((Object)runningStatefulTask.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p0changelog, (Object)42L)}));
        Mockito.when((Object)restoringStatefulTask.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p1changelog, (Object)24L)}));
        Mockito.when((Object)restoringStandbyTask.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.t1p2changelog, (Object)84L)}));
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)runningStatefulTask)}));
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{restoringStandbyTask, restoringStatefulTask}));
        MatcherAssert.assertThat((Object)taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)42L), Utils.mkEntry((Object)this.taskId01, (Object)24L), Utils.mkEntry((Object)this.taskId02, (Object)84L)})));
    }

    @Test
    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)-4L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)10L)});
        this.computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
    }

    private void computeOffsetSumAndVerify(Map<TopicPartition, Long> changelogOffsets, Map<TaskId, Long> expectedOffsetSums) throws Exception {
        this.expectLockObtainedFor(this.taskId00);
        this.expectDirectoryNotEmpty(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask restoringTask = this.handleAssignment(Collections.emptyMap(), Collections.emptyMap(), this.taskId00Assignment).get(this.taskId00);
        restoringTask.setChangelogOffsets(changelogOffsets);
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is(expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumForStandbyTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.expectDirectoryNotEmpty(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask restoringTask = this.handleAssignment(Collections.emptyMap(), this.taskId00Assignment, Collections.emptyMap()).get(this.taskId00);
        restoringTask.setChangelogOffsets(changelogOffsets);
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        StateMachineTask uninitializedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singleton(uninitializedTask));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)uninitializedTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception {
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 0), (Object)5L), Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)10L)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)15L)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        StateMachineTask closedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singleton(closedTask));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        closedTask.suspend();
        closedTask.closeClean();
        MatcherAssert.assertThat((Object)closedTask.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
        this.expectLockFailedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue((boolean)this.taskManager.lockedTaskDirectories().isEmpty());
        Assert.assertTrue((boolean)this.taskManager.getTaskOffsetSums().isEmpty());
    }

    @Test
    public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.expectDirectoryNotEmpty(this.taskId00);
        Mockito.when((Object)this.stateDirectory.checkpointFileFor(this.taskId00)).thenReturn((Object)this.getCheckpointFile(this.taskId00));
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        Assert.assertTrue((boolean)this.taskManager.getTaskOffsetSums().isEmpty());
    }

    @Test
    public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception {
        long largeOffset = 0x3FFFFFFFFFFFFFFFL;
        Map changelogOffsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("changelog", 1), (Object)0x3FFFFFFFFFFFFFFFL), Utils.mkEntry((Object)new TopicPartition("changelog", 2), (Object)0x3FFFFFFFFFFFFFFFL), Utils.mkEntry((Object)new TopicPartition("changelog", 3), (Object)0x3FFFFFFFFFFFFFFFL)});
        Map expectedOffsetSums = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)Long.MAX_VALUE)});
        this.expectLockObtainedFor(this.taskId00);
        this.makeTaskFolders(this.taskId00.toString());
        this.writeCheckpointFile(this.taskId00, changelogOffsets);
        this.taskManager.handleRebalanceStart(Collections.singleton("topic"));
        MatcherAssert.assertThat((Object)this.taskManager.getTaskOffsetSums(), (Matcher)Matchers.is((Object)expectedOffsetSums));
    }

    @Test
    public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public void closeClean() {
                throw new RuntimeException("KABOOM!");
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Encounter unexpected fatal error for task 0_0"));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        ArrayList<StateDirectory.TaskDirectory> taskFolders = new ArrayList<StateDirectory.TaskDirectory>(2);
        taskFolders.add(new StateDirectory.TaskDirectory(this.testFolder.newFolder(this.taskId00.toString()), null));
        taskFolders.add(new StateDirectory.TaskDirectory(this.testFolder.newFolder(this.taskId01.toString()), null));
        Mockito.when((Object)this.stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders).thenReturn(new ArrayList());
        this.expectLockObtainedFor(this.taskId00, this.taskId01);
        this.expectDirectoryNotEmpty(this.taskId00, this.taskId01);
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleLostAll();
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId01, task01)));
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.lockedTaskDirectories(), (Matcher)Matchers.is(Collections.emptySet()));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        taskManager.handleLostAll();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).reInitializeThreadProducer();
    }

    @Test
    public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        ((ActiveTaskCreator)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM!")}).when((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Encounter unexpected fatal error for task 0_0"));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldReAddRevivedTasksToStateUpdater() {
        StreamTask corruptedActiveTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask corruptedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)tasks.task(this.taskId03)).thenReturn((Object)corruptedActiveTask);
        Mockito.when((Object)tasks.task(this.taskId02)).thenReturn((Object)corruptedStandbyTask);
        taskManager.handleCorruption(Utils.mkSet((Object[])new TaskId[]{corruptedActiveTask.id(), corruptedStandbyTask.id()}));
        InOrder activeTaskOrder = Mockito.inOrder((Object[])new Object[]{corruptedActiveTask});
        ((StreamTask)activeTaskOrder.verify((Object)corruptedActiveTask)).closeDirty();
        ((StreamTask)activeTaskOrder.verify((Object)corruptedActiveTask)).revive();
        InOrder standbyTaskOrder = Mockito.inOrder((Object[])new Object[]{corruptedStandbyTask});
        ((StandbyTask)standbyTaskOrder.verify((Object)corruptedStandbyTask)).closeDirty();
        ((StandbyTask)standbyTaskOrder.verify((Object)corruptedStandbyTask)).revive();
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)corruptedActiveTask);
        ((TasksRegistry)Mockito.verify((Object)tasks)).removeTask((Task)corruptedStandbyTask);
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)Utils.mkSet((Object[])new Task[]{corruptedActiveTask}));
        ((TasksRegistry)Mockito.verify((Object)tasks)).addPendingTasksToInit((Collection)Utils.mkSet((Object[])new Task[]{corruptedStandbyTask}));
        ((Consumer)Mockito.verify(this.consumer)).assignment();
    }

    @Test
    public void shouldReviveCorruptTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            @Override
            public void postCommit(boolean enforceCheckpoint) {
                if (enforceCheckpoint) {
                    enforcedCheckpoint.set(true);
                }
                super.postCommit(enforceCheckpoint);
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn(this.taskId00Partitions);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task00.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)enforcedCheckpoint.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn(this.taskId00Partitions);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task00.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask corruptedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask nonCorruptedTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        HashMap<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        firstAssignment.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(firstAssignment))).thenReturn(Arrays.asList(new Task[]{corruptedTask, nonCorruptedTask}));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn(this.taskId00Partitions);
        this.taskManager.handleAssignment(firstAssignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), tp -> MatcherAssert.assertThat((Object)tp, (Matcher)Matchers.is((Matcher)Matchers.empty()))), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)nonCorruptedTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        nonCorruptedTask.setCommitNeeded();
        corruptedTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        Assert.assertTrue((boolean)nonCorruptedTask.commitPrepared);
        MatcherAssert.assertThat((Object)nonCorruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat((Object)corruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.never())).commitSync(Collections.emptyMap());
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldNotCommitNonRunningNonCorruptedTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask corruptedTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        nonRunningNonCorruptedTask.setCommitNeeded();
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(assignment))).thenReturn(Arrays.asList(new Task[]{corruptedTask, nonRunningNonCorruptedTask}));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.taskId00Partitions);
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        corruptedTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)nonRunningNonCorruptedTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)nonRunningNonCorruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat((Object)corruptedTask.partitionsForOffsetReset, (Matcher)IsEqual.equalTo(this.taskId00Partitions));
        Assert.assertFalse((boolean)nonRunningNonCorruptedTask.commitPrepared);
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningStandbyTasksWithStateUpdaterEnabled() {
        StreamTask activeRestoringTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        StreamTask corruptedTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).withInputPartitions(this.taskId02Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId02, (Object)corruptedTask)}));
        Mockito.when((Object)tasks.task(this.taskId02)).thenReturn((Object)corruptedTask);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        Mockito.when((Object)this.consumer.assignment()).thenReturn((Object)Utils.intersection(HashSet::new, this.taskId00Partitions, (Set[])new Set[]{this.taskId01Partitions, this.taskId02Partitions}));
        taskManager.handleCorruption(Utils.mkSet((Object[])new TaskId[]{this.taskId02}));
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).commitNeeded();
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).prepareCommit();
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).postCommit(ArgumentMatchers.anyBoolean());
        ((StandbyTask)Mockito.verify((Object)standbyTask, (VerificationMode)Mockito.never())).commitNeeded();
        ((StandbyTask)Mockito.verify((Object)standbyTask, (VerificationMode)Mockito.never())).prepareCommit();
        ((StandbyTask)Mockito.verify((Object)standbyTask, (VerificationMode)Mockito.never())).postCommit(ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled() {
        StreamTask activeRestoringTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        Mockito.when((Object)standbyTask.commitNeeded()).thenReturn((Object)true);
        StreamTask corruptedTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).withInputPartitions(this.taskId02Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        Mockito.when((Object)tasks.allTasksPerId()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)activeRestoringTask), Utils.mkEntry((Object)this.taskId01, (Object)standbyTask), Utils.mkEntry((Object)this.taskId02, (Object)corruptedTask)}));
        Mockito.when((Object)tasks.task(this.taskId02)).thenReturn((Object)corruptedTask);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
        Mockito.when((Object)this.consumer.assignment()).thenReturn((Object)Utils.intersection(HashSet::new, this.taskId00Partitions, (Set[])new Set[]{this.taskId01Partitions, this.taskId02Partitions}));
        taskManager.handleCorruption(Utils.mkSet((Object[])new TaskId[]{this.taskId02}));
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).commitNeeded();
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).prepareCommit();
        ((StreamTask)Mockito.verify((Object)activeRestoringTask, (VerificationMode)Mockito.never())).postCommit(ArgumentMatchers.anyBoolean());
        ((StandbyTask)Mockito.verify((Object)standbyTask)).prepareCommit();
        ((StandbyTask)Mockito.verify((Object)standbyTask)).postCommit(ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask corruptedStandby = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, stateManager);
        StateMachineTask runningNonCorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new TaskMigratedException("You dropped out of the group!", (Throwable)new RuntimeException());
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId01Assignment))).thenReturn(Collections.singleton(runningNonCorruptedActive));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singleton(corruptedStandby));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        this.taskManager.handleAssignment(this.taskId01Assignment, this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)runningNonCorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedStandby.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        runningNonCorruptedActive.setCommitNeeded();
        corruptedStandby.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.handleCorruption(Collections.singleton(this.taskId00)));
        MatcherAssert.assertThat((Object)corruptedStandby.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedStandby.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        Mockito.when((Object)this.stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList());
        StateMachineTask corruptedActive = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask uncorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActive.setCommitNeeded();
        HashMap<TaskId, Set<TopicPartition>> firstAssignement = new HashMap<TaskId, Set<TopicPartition>>();
        firstAssignement.putAll(this.taskId00Assignment);
        firstAssignement.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(firstAssignement))).thenReturn(Arrays.asList(new Task[]{corruptedActive, uncorruptedActive}));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
        this.taskManager.handleAssignment(firstAssignement, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        this.taskManager.handleRebalanceStart(Collections.singleton("topic1"));
        MatcherAssert.assertThat((Object)this.taskManager.rebalanceInProgress(), (Matcher)Matchers.is((Object)true));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask corruptedActive = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        StateMachineTask uncorruptedActive = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                Assert.fail((String)"Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
        HashMap<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<TaskId, Set<TopicPartition>>();
        firstAssignment.putAll(this.taskId00Assignment);
        firstAssignment.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(firstAssignment))).thenReturn(Arrays.asList(new Task[]{corruptedActive, uncorruptedActive}));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException()}).when(this.consumer)).commitSync(offsets);
        this.taskManager.handleAssignment(firstAssignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        uncorruptedActive.setCommitNeeded();
        corruptedActive.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActive.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActive.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActive.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)uncorruptedActive.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn((Object)producer);
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask corruptedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                corruptedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask uncorruptedActiveTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                uncorruptedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
        HashMap<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<TaskId, Set<TopicPartition>>();
        firstAssignment.putAll(this.taskId00Assignment);
        firstAssignment.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(firstAssignment))).thenReturn(Arrays.asList(new Task[]{corruptedActiveTask, uncorruptedActiveTask}));
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)groupMetadata);
        ((StreamsProducer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException()}).when((Object)producer)).commitTransaction(offsets, groupMetadata);
        taskManager.handleAssignment(firstAssignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)corruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        uncorruptedActiveTask.setCommitNeeded();
        Map<TopicPartition, Long> corruptedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p0changelog, 0L);
        corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets);
        Map<TopicPartition, Long> uncorruptedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p1changelog, 0L);
        uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets);
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)false));
        taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)corruptedActiveTask.commitCompleted, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)corruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)uncorruptedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)corruptedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)uncorruptedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() {
        StateMachineTask revokedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
        revokedActiveTask.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithCommitNeeded = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                Assert.fail((String)"Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01);
        unrevokedActiveTaskWithCommitNeeded.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded}));
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException()}).when(this.consumer)).commitSync(expectedCommittedOffsets);
        this.taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn((Object)producer);
        ProcessorStateManager stateManager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        StateMachineTask revokedActiveTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, stateManager);
        Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets);
        revokedActiveTask.setCommitNeeded();
        final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
        StateMachineTask unrevokedActiveTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, stateManager){

            public void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
                super.markChangelogAsCorrupted(partitions);
                unrevokedTaskChangelogMarkedAsCorrupted.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> unrevokedTaskOffsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets);
        unrevokedActiveTask.setCommitNeeded();
        StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
        expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment).thenReturn((Object)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded}));
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)groupMetadata);
        ((StreamsProducer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException()}).when((Object)producer)).commitTransaction(expectedCommittedOffsets, groupMetadata);
        taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Map<TopicPartition, Long> revokedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p0changelog, 0L);
        revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets);
        Map<TopicPartition, Long> unrevokedActiveTaskChangelogOffsets = Collections.singletonMap(this.t1p1changelog, 0L);
        unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets);
        taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)unrevokedTaskChangelogMarkedAsCorrupted.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)revokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)unrevokedActiveTask.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)unrevokedActiveTaskWithoutCommitNeeded.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        ((ProcessorStateManager)Mockito.verify((Object)stateManager)).markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
    }

    @Test
    public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldAddNonResumedSuspendedTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(Collections.emptyMap()));
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).assignment();
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).resume(this.assignment);
    }

    @Test
    public void shouldUpdateInputPartitionsAfterRebalance() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Set newPartitionsSet = Utils.mkSet((Object[])new TopicPartition[]{this.t1p1});
        Map<TaskId, Set> taskIdSetMap = Collections.singletonMap(this.taskId00, newPartitionsSet);
        this.taskManager.handleAssignment(taskIdSetMap, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertEquals((Object)newPartitionsSet, (Object)task00.inputPartitions());
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).resume(this.assignment);
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).assignment();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(Collections.emptyMap()));
    }

    @Test
    public void shouldAddNewActiveTasks() {
        Map<TaskId, Set<TopicPartition>> assignment = this.taskId00Assignment;
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), noOpResetter -> {});
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        ((Consumer)Mockito.verify(this.consumer)).assignment();
        ((Consumer)Mockito.verify(this.consumer)).resume((Collection)ArgumentMatchers.eq(Collections.emptySet()));
    }

    @Test
    public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public void initializeIfNeeded() {
                throw new LockException("can't lock");
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            @Override
            public void initializeIfNeeded() {
                throw new TimeoutException("timed out");
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Arrays.asList(new Task[]{task00, task01}));
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
                throw new TimeoutException("timeout!");
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldSuspendActiveTasksDuringRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets01);
        task01.setCommitNeeded();
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets02);
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false, this.stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn((Object)producer);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Collections.singletonList(task10));
        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)groupMetadata);
        task00.committedOffsets();
        task01.committedOffsets();
        task02.committedOffsets();
        task10.committedOffsets();
        taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task02.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
        ((StreamsProducer)Mockito.verify((Object)producer)).commitTransaction(expectedCommittedOffsets, groupMetadata);
    }

    @Test
    public void shouldCommitAllNeededTasksOnHandleRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets01);
        task01.setCommitNeeded();
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets02);
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false, this.stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets00);
        expectedCommittedOffsets.putAll(offsets01);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Collections.singletonList(task10));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitPrepared, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task02.commitPrepared, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
        ((Consumer)Mockito.verify(this.consumer)).commitSync(expectedCommittedOffsets);
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false, this.stateManager);
        Map<TaskId, Set<TopicPartition>> assignmentActive = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map<TaskId, Set<TopicPartition>> assignmentStandby = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(assignmentActive))).thenReturn(Collections.singleton(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Collections.singletonList(task10));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task10.commitPrepared, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets00);
        task00.setCommitNeeded();
        StateMachineTask task10 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false, this.stateManager);
        Map<TaskId, Set<TopicPartition>> assignmentActive = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map<TaskId, Set<TopicPartition>> assignmentStandby = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(assignmentActive))).thenReturn(Collections.singleton(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Collections.singletonList(task10));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task10.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        this.taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldPassUpIfExceptionDuringSuspend() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation(this.taskId00Partitions));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithAlos() {
        this.shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV1() {
        Mockito.when((Object)this.activeTaskCreator.streamsProducerForTask((TaskId)ArgumentMatchers.any())).thenReturn(Mockito.mock(StreamsProducer.class));
        this.shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA);
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV2() {
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn(Mockito.mock(StreamsProducer.class));
        this.shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2);
    }

    private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(StreamsConfigUtils.ProcessingMode processingMode) {
        TaskManager taskManager = this.setUpTaskManager(processingMode, null, false);
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions), Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(changelog);
            }
        };
        final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
        final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false);
        final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask01.set(true);
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask02.set(true);
            }
        };
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override
            public void closeDirty() {
                super.closeDirty();
                closedDirtyTask03.set(true);
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02, task03}));
        taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01)), Utils.mkEntry((Object)this.taskId02, (Object)((Object)task02)), Utils.mkEntry((Object)this.taskId03, (Object)((Object)task03))})));
        MatcherAssert.assertThat((Object)taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).completedChangelogs();
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"oops"));
        MatcherAssert.assertThat((Object)closedDirtyTask01.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)closedDirtyTask02.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)closedDirtyTask03.get(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.times((int)4))).closeAndRemoveTaskProducerIfNeeded((TaskId)ArgumentMatchers.any());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(changelog);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Collections.singletonList(task00));
        ((ActiveTaskCreator)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("whatever")}).when((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).completedChangelogs();
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"whatever"));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(changelog);
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Collections.singletonList(task00));
        ((ActiveTaskCreator)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("whatever")}).when((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).completedChangelogs();
        RuntimeException exception = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.shutdown(true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"whatever"));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
        this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, false);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("task 0_1 prepare commit boom!");
            }
        };
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.singletonMap(this.taskId00, this.taskId00Partitions)));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"task 0_1 prepare commit boom!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.allTasks(), (Matcher)Matchers.is(Collections.singletonMap(this.taskId00, task00)));
    }

    @Test
    public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("task 0_1 suspend boom!");
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        this.taskManager.addTask((Task)task02);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation((Collection)Utils.union(HashSet::new, (Set[])new Set[]{this.taskId01Partitions, this.taskId02Partitions})));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"task 0_1 suspend boom!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
        final TopicPartition changelog = new TopicPartition("changelog", 0);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(changelog);
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignment))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        ((ActiveTaskCreator)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("whatever")}).when((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded((TaskId)ArgumentMatchers.any());
        ((ActiveTaskCreator)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("whatever all")}).when((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, (Object)((Object)task00)), Utils.mkEntry((Object)this.taskId01, (Object)((Object)task01)), Utils.mkEntry((Object)this.taskId02, (Object)((Object)task02))})));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changeLogReader)).completedChangelogs();
        this.taskManager.shutdown(false);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.times((int)3))).closeAndRemoveTaskProducerIfNeeded((TaskId)ArgumentMatchers.any());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        Map<TaskId, Set<TopicPartition>> assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, this.stateManager);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignment)).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(Collections.emptyMap(), assignment);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        this.taskManager.shutdown(true);
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
        ((Consumer)Mockito.verify(this.consumer)).assignment();
        ((Consumer)Mockito.verify(this.consumer)).resume((Collection)ArgumentMatchers.eq(Collections.emptySet()));
    }

    @Test
    public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        StreamTask failedStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        StandbyTask failedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(new StateUpdater.ExceptionAndTask(new RuntimeException(), (Task)failedStatefulTask), new StateUpdater.ExceptionAndTask(new RuntimeException(), (Task)failedStandbyTask)));
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        taskManager.shutdown(true);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeThreadProducerIfNeeded();
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
        ((StreamTask)Mockito.verify((Object)failedStatefulTask)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)failedStatefulTask)).suspend();
        ((StreamTask)Mockito.verify((Object)failedStatefulTask)).closeDirty();
    }

    @Test
    public void shouldShutdownSchedulingTaskManager() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
        taskManager.shutdown(true);
        ((DefaultTaskManager)Mockito.verify((Object)this.schedulingTaskManager)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
    }

    @Test
    public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
        TasksRegistry tasks = (TasksRegistry)Mockito.mock(TasksRegistry.class);
        StreamTask removedStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        StandbyTask removedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build();
        StreamTask removedFailedStatefulTask = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).build();
        StandbyTask removedFailedStandbyTask = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId04, this.taskId04ChangelogPartitions).inState(Task.State.RUNNING).build();
        StreamTask removedFailedStatefulTaskDuringRemoval = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId05, this.taskId05ChangelogPartitions).inState(Task.State.RESTORING).build();
        StandbyTask removedFailedStandbyTaskDuringRemoval = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.stateUpdater.getTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{removedStatefulTask, removedStandbyTask, removedFailedStatefulTask, removedFailedStandbyTask, removedFailedStatefulTaskDuringRemoval, removedFailedStandbyTaskDuringRemoval}));
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedStatefulTask = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedStandbyTask = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStatefulTask = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStandbyTask = new CompletableFuture<StateUpdater.RemovedTaskResult>();
        CompletableFuture futureForRemovedFailedStatefulTaskDuringRemoval = new CompletableFuture();
        CompletableFuture futureForRemovedFailedStandbyTaskDuringRemoval = new CompletableFuture();
        Mockito.when((Object)this.stateUpdater.remove(removedStatefulTask.id())).thenReturn(futureForRemovedStatefulTask);
        Mockito.when((Object)this.stateUpdater.remove(removedStandbyTask.id())).thenReturn(futureForRemovedStandbyTask);
        Mockito.when((Object)this.stateUpdater.remove(removedFailedStatefulTask.id())).thenReturn(futureForRemovedFailedStatefulTask);
        Mockito.when((Object)this.stateUpdater.remove(removedFailedStandbyTask.id())).thenReturn(futureForRemovedFailedStandbyTask);
        Mockito.when((Object)this.stateUpdater.remove(removedFailedStatefulTaskDuringRemoval.id())).thenReturn(futureForRemovedFailedStatefulTaskDuringRemoval);
        Mockito.when((Object)this.stateUpdater.remove(removedFailedStandbyTaskDuringRemoval.id())).thenReturn(futureForRemovedFailedStandbyTaskDuringRemoval);
        Mockito.when((Object)this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(new StateUpdater.ExceptionAndTask((RuntimeException)((Object)new StreamsException("KABOOM!")), (Task)removedFailedStatefulTaskDuringRemoval), new StateUpdater.ExceptionAndTask((RuntimeException)((Object)new StreamsException("KABOOM!")), (Task)removedFailedStandbyTaskDuringRemoval)));
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
        futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult((Task)removedStatefulTask));
        futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult((Task)removedStandbyTask));
        futureForRemovedFailedStatefulTask.complete(new StateUpdater.RemovedTaskResult((Task)removedFailedStatefulTask, (RuntimeException)((Object)new StreamsException("KABOOM!"))));
        futureForRemovedFailedStandbyTask.complete(new StateUpdater.RemovedTaskResult((Task)removedFailedStandbyTask, (RuntimeException)((Object)new StreamsException("KABOOM!"))));
        futureForRemovedFailedStatefulTaskDuringRemoval.completeExceptionally(new StreamsException("KABOOM!"));
        futureForRemovedFailedStandbyTaskDuringRemoval.completeExceptionally(new StreamsException("KABOOM!"));
        taskManager.shutdown(true);
        ((StateUpdater)Mockito.verify((Object)this.stateUpdater)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)removedStatefulTask);
        ((TasksRegistry)Mockito.verify((Object)tasks)).addTask((Task)removedStandbyTask);
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTask)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTask)).suspend();
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTask)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId03);
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTask)).prepareCommit();
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTask)).suspend();
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTask)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.never())).closeAndRemoveTaskProducerIfNeeded(this.taskId04);
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTaskDuringRemoval)).prepareCommit();
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTaskDuringRemoval)).suspend();
        ((StreamTask)Mockito.verify((Object)removedFailedStatefulTaskDuringRemoval)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId05);
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTaskDuringRemoval)).prepareCommit();
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTaskDuringRemoval)).suspend();
        ((StandbyTask)Mockito.verify((Object)removedFailedStandbyTaskDuringRemoval)).closeDirty();
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.never())).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId00, task00)));
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.anEmptyMap());
        ((Consumer)Mockito.verify(this.consumer)).resume(this.assignment);
    }

    @Test
    public void shouldInitialiseNewStandbyTasks() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.activeTaskMap(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.standbyTaskMap(), (Matcher)Matchers.equalTo(Collections.singletonMap(this.taskId01, task01)));
    }

    @Test
    public void shouldHandleRebalanceEvents() {
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList());
        MatcherAssert.assertThat((Object)this.taskManager.rebalanceInProgress(), (Matcher)Matchers.is((Object)false));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.rebalanceInProgress(), (Matcher)Matchers.is((Object)true));
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat((Object)this.taskManager.rebalanceInProgress(), (Matcher)Matchers.is((Object)false));
        ((Consumer)Mockito.verify(this.consumer)).pause(this.assignment);
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)2));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)false));
        ((Consumer)Mockito.verify(this.consumer)).commitSync(offsets);
    }

    @Test
    public void shouldCommitProvidedTasksIfNeeded() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, false, this.stateManager);
        StateMachineTask task04 = new StateMachineTask(this.taskId04, this.taskId04Partitions, false, this.stateManager);
        StateMachineTask task05 = new StateMachineTask(this.taskId05, this.taskId05Partitions, false, this.stateManager);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions), Utils.mkEntry((Object)this.taskId04, this.taskId04Partitions), Utils.mkEntry((Object)this.taskId05, this.taskId05Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02}));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Arrays.asList(new Task[]{task03, task04, task05}));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        task03.setCommitNeeded();
        task04.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task02, task03, task05})), (Matcher)IsEqual.equalTo((Object)2));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task01.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task02.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task03.commitNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task04.commitNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task05.commitNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)1));
        MatcherAssert.assertThat((Object)task00.commitNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager);
        this.makeTaskFolders(this.taskId00.toString(), this.taskId01.toString());
        this.expectDirectoryNotEmpty(this.taskId00, this.taskId01);
        this.expectLockObtainedFor(this.taskId00, this.taskId01);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)-1));
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasksPerUserRequested(), (Matcher)IsEqual.equalTo((Object)-1));
    }

    @Test
    public void shouldCommitViaConsumerIfEosDisabled() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        this.taskManager.commitAll();
        ((Consumer)Mockito.verify(this.consumer)).commitSync(offsets);
    }

    @Test
    public void shouldCommitViaProducerIfEosAlphaEnabled() {
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.streamsProducerForTask((TaskId)ArgumentMatchers.any(TaskId.class))).thenReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, null));
        this.shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
        ((StreamsProducer)Mockito.verify((Object)producer)).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
        ((StreamsProducer)Mockito.verify((Object)producer)).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{producer});
    }

    @Test
    public void shouldCommitViaProducerIfEosV2Enabled() {
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT02 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, null));
        HashMap<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        allOffsets.putAll(offsetsT01);
        allOffsets.putAll(offsetsT02);
        this.shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
        ((StreamsProducer)Mockito.verify((Object)producer)).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{producer});
    }

    private void shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode processingMode, Map<TopicPartition, OffsetAndMetadata> offsetsT01, Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
        TaskManager taskManager = this.setUpTaskManager(processingMode, false);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        task01.setCommitNeeded();
        taskManager.addTask((Task)task01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        task02.setCommittableOffsetsAndMetadata(offsetsT02);
        task02.setCommitNeeded();
        taskManager.addTask((Task)task02);
        Mockito.when((Object)this.consumer.groupMetadata()).thenReturn((Object)new ConsumerGroupMetadata("appId"));
        taskManager.commitAll();
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"opsh."));
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager){

            @Override
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(task01));
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task01.setCommitNeeded();
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"opsh."));
    }

    @Test
    public void shouldSendPurgeData() {
        Mockito.when((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L)))).thenReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, TaskManagerTest.completedFuture())));
        Mockito.when((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)17L)))).thenReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, TaskManagerTest.completedFuture())));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.adminClient});
        final HashMap<TopicPartition, Long> purgableOffsets = new HashMap<TopicPartition, Long>();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Map<TopicPartition, Long> purgeableOffsets() {
                return purgableOffsets;
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        purgableOffsets.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        purgableOffsets.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        ((Admin)inOrder.verify((Object)this.adminClient)).deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L)));
        ((Admin)inOrder.verify((Object)this.adminClient)).deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)17L)));
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        Mockito.when((Object)this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L)))).thenReturn((Object)new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords)));
        final HashMap<TopicPartition, Long> purgableOffsets = new HashMap<TopicPartition, Long>();
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Map<TopicPartition, Long> purgeableOffsets() {
                return purgableOffsets;
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        purgableOffsets.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        purgableOffsets.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords));
        futureDeletedRecords.completeExceptionally((Throwable)new Exception("KABOOM!"));
        Mockito.when((Object)this.adminClient.deleteRecords((Map)ArgumentMatchers.any())).thenReturn((Object)deleteRecordsResult);
        this.taskManager.addTask((Task)task00);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setPurgeableOffsets(Collections.singletonMap(this.t1p1, 5L));
        this.taskManager.maybePurgeCommittedRecords();
        this.taskManager.maybePurgeCommittedRecords();
    }

    @Test
    public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets0 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets0);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets1 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        task01.setCommittableOffsetsAndMetadata(offsets1);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, null));
        task02.setCommittableOffsetsAndMetadata(offsets2);
        StateMachineTask task03 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true, this.stateManager);
        StateMachineTask task04 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false, this.stateManager);
        HashMap<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        expectedCommittedOffsets.putAll(offsets0);
        expectedCommittedOffsets.putAll(offsets1);
        Map assignmentActive = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId00, this.taskId00Partitions), Utils.mkEntry((Object)this.taskId01, this.taskId01Partitions), Utils.mkEntry((Object)this.taskId02, this.taskId02Partitions), Utils.mkEntry((Object)this.taskId03, this.taskId03Partitions)});
        Map assignmentStandby = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.taskId10, this.taskId10Partitions)});
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq((Object)assignmentActive))).thenReturn(Arrays.asList(new Task[]{task00, task01, task02, task03}));
        Mockito.when((Object)this.standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(Collections.singletonList(task04));
        this.taskManager.handleAssignment(assignmentActive, assignmentStandby);
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task02.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task03.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task04.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.setCommitNeeded();
        task00.setCommitRequested();
        task01.setCommitNeeded();
        task02.setCommitRequested();
        task03.setCommitNeeded();
        task03.setCommitRequested();
        task04.setCommitNeeded();
        task04.setCommitRequested();
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasksPerUserRequested(), (Matcher)IsEqual.equalTo((Object)3));
        ((Consumer)Mockito.verify(this.consumer)).commitSync(expectedCommittedOffsets);
    }

    @Test
    public void shouldProcessActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        HashMap<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<TaskId, Set<TopicPartition>>();
        firstAssignment.put(this.taskId00, this.taskId00Partitions);
        firstAssignment.put(this.taskId01, this.taskId01Partitions);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(firstAssignment))).thenReturn(Arrays.asList(new Task[]{task00, task01}));
        this.taskManager.handleAssignment(firstAssignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        task00.addRecords(this.t1p0, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p0, 0L), TaskManagerTest.getConsumerRecord(this.t1p0, 1L), TaskManagerTest.getConsumerRecord(this.t1p0, 2L), TaskManagerTest.getConsumerRecord(this.t1p0, 3L), TaskManagerTest.getConsumerRecord(this.t1p0, 4L), TaskManagerTest.getConsumerRecord(this.t1p0, 5L)));
        task01.addRecords(this.t1p1, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p1, 0L), TaskManagerTest.getConsumerRecord(this.t1p1, 1L), TaskManagerTest.getConsumerRecord(this.t1p1, 2L), TaskManagerTest.getConsumerRecord(this.t1p1, 3L), TaskManagerTest.getConsumerRecord(this.t1p1, 4L)));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)6));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)5));
        MatcherAssert.assertThat((Object)this.taskManager.process(3, this.time), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void shouldNotFailOnTimeoutException() {
        final AtomicReference<TimeoutException> timeoutException = new AtomicReference<TimeoutException>();
        timeoutException.set(new TimeoutException("Skip me!"));
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        task00.transitionTo(Task.State.RESTORING);
        task00.transitionTo(Task.State.RUNNING);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager){

            @Override
            public boolean process(long wallClockTime) {
                TimeoutException exception = (TimeoutException)((Object)timeoutException.get());
                if (exception != null) {
                    throw exception;
                }
                return true;
            }
        };
        task01.transitionTo(Task.State.RESTORING);
        task01.transitionTo(Task.State.RUNNING);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        task02.transitionTo(Task.State.RESTORING);
        task02.transitionTo(Task.State.RUNNING);
        this.taskManager.addTask((Task)task00);
        this.taskManager.addTask((Task)task01);
        this.taskManager.addTask((Task)task02);
        task00.addRecords(this.t1p0, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p0, 0L), TaskManagerTest.getConsumerRecord(this.t1p0, 1L)));
        task01.addRecords(this.t1p1, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p1, 0L), TaskManagerTest.getConsumerRecord(this.t1p1, 1L)));
        task02.addRecords(this.t1p2, Arrays.asList(TaskManagerTest.getConsumerRecord(this.t1p2, 0L), TaskManagerTest.getConsumerRecord(this.t1p2, 1L)));
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)task01.timeout, (Matcher)IsEqual.equalTo((Object)this.time.milliseconds()));
        timeoutException.set(null);
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)task01.timeout, (Matcher)IsEqual.equalTo(null));
        MatcherAssert.assertThat((Object)this.taskManager.process(1, this.time), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public boolean process(long wallClockTime) {
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        TopicPartition partition = this.taskId00Partitions.iterator().next();
        task00.addRecords(partition, Collections.singletonList(TaskManagerTest.getConsumerRecord(partition, 0L)));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.process(1, this.time));
    }

    @Test
    public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public boolean process(long wallClockTime) {
                throw new RuntimeException("oops");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        TopicPartition partition = this.taskId00Partitions.iterator().next();
        task00.addRecords(partition, Collections.singletonList(TaskManagerTest.getConsumerRecord(partition, 0L)));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.process(1, this.time));
        MatcherAssert.assertThat((Object)exception.taskId().isPresent(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(exception.taskId().get(), (Matcher)Matchers.is((Object)this.taskId00));
        MatcherAssert.assertThat((Object)exception.getCause().getMessage(), (Matcher)Matchers.is((Object)"oops"));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            public boolean maybePunctuateStreamTime() {
                throw new TaskMigratedException("migrated", (Throwable)new RuntimeException("cause"));
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.punctuate());
    }

    @Test
    public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            public boolean maybePunctuateStreamTime() {
                throw new KafkaException("oops");
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        Assert.assertThrows(KafkaException.class, () -> this.taskManager.punctuate());
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            public boolean maybePunctuateStreamTime() {
                return true;
            }

            public boolean maybePunctuateSystemTime() {
                return true;
            }
        };
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
        MatcherAssert.assertThat((Object)this.taskManager.punctuate(), (Matcher)IsEqual.equalTo((Object)2));
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(new TopicPartition("fake", 0));
            }
        };
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldHaveRemainingPartitionsUncleared() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task00.setCommittableOffsetsAndMetadata(offsets);
        Mockito.when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(task00));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class);){
            appender.setClassLoggerToDebug(TaskManager.class);
            this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
            MatcherAssert.assertThat((Object)this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            this.taskManager.handleRevocation((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, new TopicPartition("unknown", 0)}));
            MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
            List messages = appender.getMessages();
            MatcherAssert.assertThat((Object)messages, (Matcher)CoreMatchers.hasItem((Object)"taskManagerTestThe following revoked partitions [unknown-0] are missing from the current task partitions. It could potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback."));
        }
    }

    @Test
    public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t2 close exception", (Throwable)new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"t2 close exception; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Encounter unexpected fatal error for task 0_2"));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)IsEqual.equalTo((Object)"t2 illegal state exception"));
    }

    @Test
    public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
        StateMachineTask migratedTask01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", (Throwable)new RuntimeException());
            }
        };
        StateMachineTask migratedTask02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new KafkaException("Kaboom for t2!", (Throwable)new RuntimeException());
            }
        };
        this.taskManager.addTask((Task)migratedTask01);
        this.taskManager.addTask((Task)migratedTask02);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()));
        MatcherAssert.assertThat((Object)thrown.taskId().isPresent(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(thrown.taskId().get(), (Matcher)Matchers.is((Object)this.taskId02));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)IsEqual.equalTo((Object)"Kaboom for t2!"));
    }

    @Test
    public void shouldTransmitProducerMetrics() {
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        Map<MetricName, KafkaMetric> dummyProducerMetrics = Collections.singletonMap(testMetricName, testMetric);
        Mockito.when((Object)this.activeTaskCreator.producerMetrics()).thenReturn(dummyProducerMetrics);
        MatcherAssert.assertThat((Object)this.taskManager.producerMetrics(), (Matcher)Matchers.is(dummyProducerMetrics));
    }

    private Map<TaskId, StateMachineTask> handleAssignment(Map<TaskId, Set<TopicPartition>> runningActiveAssignment, Map<TaskId, Set<TopicPartition>> standbyAssignment, Map<TaskId, Set<TopicPartition>> restoringActiveAssignment) {
        Set runningTasks = runningActiveAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), true, this.stateManager)).collect(Collectors.toSet());
        Set standbyTasks = standbyAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), false, this.stateManager)).collect(Collectors.toSet());
        Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream().map(t -> new StateMachineTask((TaskId)t.getKey(), (Set)t.getValue(), true, this.stateManager)).collect(Collectors.toSet());
        restoringTasks.forEach(t -> ((StateMachineTask)((Object)t)).setChangelogOffsets(Collections.singletonMap(new TopicPartition("changelog", 0), 0L)));
        HashMap<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<TaskId, Set<TopicPartition>>(runningActiveAssignment);
        allActiveTasksAssignment.putAll(restoringActiveAssignment);
        HashSet allActiveTasks = new HashSet(runningTasks);
        allActiveTasks.addAll(restoringTasks);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
        Mockito.lenient().when((Object)this.consumer.assignment()).thenReturn(this.assignment);
        this.taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), null);
        HashMap<TaskId, StateMachineTask> allTasks = new HashMap<TaskId, StateMachineTask>();
        for (Task task : runningTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        for (Task task : restoringTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RESTORING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        for (Task task : standbyTasks) {
            MatcherAssert.assertThat((Object)task.state(), (Matcher)Matchers.is((Object)Task.State.RUNNING));
            allTasks.put(task.id(), (StateMachineTask)task);
        }
        return allTasks;
    }

    private void expectLockObtainedFor(TaskId ... tasks) {
        for (TaskId task : tasks) {
            Mockito.when((Object)this.stateDirectory.lock(task)).thenReturn((Object)true);
        }
    }

    private void expectLockFailedFor(TaskId ... tasks) {
        for (TaskId task : tasks) {
            Mockito.when((Object)this.stateDirectory.lock(task)).thenReturn((Object)false);
        }
    }

    private void expectDirectoryNotEmpty(TaskId ... tasks) {
        for (TaskId taskId : tasks) {
            Mockito.when((Object)this.stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn((Object)false);
        }
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnCommitFailed() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new CommitFailedException()}).when(this.consumer)).commitSync(offsets);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(CommitFailedException.class));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated."));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnConsumerCommit() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        task00.setCommittableOffsetsAndMetadata(this.taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0L))));
        task01.setCommittableOffsetsAndMetadata(this.taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0L))));
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("KABOOM!")}).doNothing().when(this.consumer)).commitSync((Map)ArgumentMatchers.any(Map.class));
        task00.setCommitNeeded();
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01})), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)task00.timeout, (Matcher)IsEqual.equalTo((Object)this.time.milliseconds()));
        Assert.assertNull((Object)task01.timeout);
        MatcherAssert.assertThat((Object)this.taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01})), (Matcher)IsEqual.equalTo((Object)1));
        Assert.assertNull((Object)task00.timeout);
        Assert.assertNull((Object)task01.timeout);
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).commitSync((Map)ArgumentMatchers.any(Map.class));
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
        Tasks tasks = (Tasks)Mockito.mock(Tasks.class);
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, (TasksRegistry)tasks, false);
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.streamsProducerForTask((TaskId)ArgumentMatchers.any(TaskId.class))).thenReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        ((StreamsProducer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("KABOOM!")}).doNothing().doNothing().doNothing().when((Object)producer)).commitTransaction(offsetsT00, null);
        ((StreamsProducer)Mockito.doNothing().doNothing().when((Object)producer)).commitTransaction(offsetsT01, null);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        task00.setCommittableOffsetsAndMetadata(offsetsT00);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        Mockito.when((Object)tasks.allTasks()).thenReturn((Object)Utils.mkSet((Object[])new Task[]{task00, task01, task02}));
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        TaskCorruptedException exception = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01, task02})));
        MatcherAssert.assertThat((Object)exception.corruptedTasks(), (Matcher)IsEqual.equalTo(Collections.singleton(this.taskId00)));
        ((Consumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.times((int)2))).groupMetadata();
    }

    @Test
    public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
        TaskManager taskManager = this.setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer producer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)this.activeTaskCreator.threadProducer()).thenReturn((Object)producer);
        Map<TopicPartition, OffsetAndMetadata> offsetsT00 = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        Map<TopicPartition, OffsetAndMetadata> offsetsT01 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, null));
        HashMap<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(offsetsT00);
        allOffsets.putAll(offsetsT01);
        ((StreamsProducer)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("KABOOM!")}).doNothing().when((Object)producer)).commitTransaction(allOffsets, null);
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager);
        task00.setCommittableOffsetsAndMetadata(offsetsT00);
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        task01.setCommittableOffsetsAndMetadata(offsetsT01);
        StateMachineTask task02 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, this.stateManager);
        task00.setCommitNeeded();
        task01.setCommitNeeded();
        TaskCorruptedException exception = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> taskManager.commit((Collection)Utils.mkSet((Object[])new Task[]{task00, task01, task02})));
        MatcherAssert.assertThat((Object)exception.corruptedTasks(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{this.taskId00, this.taskId01})));
        ((Consumer)Mockito.verify(this.consumer)).groupMetadata();
    }

    @Test
    public void shouldStreamsExceptionOnCommitError() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new KafkaException()}).when(this.consumer)).commitSync(offsets);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered committing offsets via consumer"));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldFailOnCommitFatal() {
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, null));
        task01.setCommittableOffsetsAndMetadata(offsets);
        task01.setCommitNeeded();
        this.taskManager.addTask((Task)task01);
        ((Consumer)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM")}).when(this.consumer)).commitSync(offsets);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.commitAll());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"KABOOM"));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.CREATED));
    }

    @Test
    public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() {
        StateMachineTask task00 = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, this.stateManager){

            @Override
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        StateMachineTask task01 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, this.stateManager);
        HashMap<TaskId, Set<TopicPartition>> assignment = new HashMap<TaskId, Set<TopicPartition>>(this.taskId00Assignment);
        assignment.putAll(this.taskId01Assignment);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(assignment))).thenReturn(Arrays.asList(new Task[]{task00, task01}));
        this.taskManager.handleAssignment(assignment, Collections.emptyMap());
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.taskManager.handleRevocation(Arrays.asList(this.t1p0, this.t1p1)));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
        MatcherAssert.assertThat((Object)task00.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        MatcherAssert.assertThat((Object)task01.state(), (Matcher)Matchers.is((Object)Task.State.SUSPENDED));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldConvertActiveTaskToStandbyTask() {
        StreamTask activeTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)activeTask.id()).thenReturn((Object)this.taskId00);
        Mockito.when((Object)activeTask.inputPartitions()).thenReturn(this.taskId00Partitions);
        Mockito.when((Object)activeTask.isActive()).thenReturn((Object)true);
        StandbyTask standbyTask = (StandbyTask)Mockito.mock(StandbyTask.class);
        Mockito.when((Object)standbyTask.id()).thenReturn((Object)this.taskId00);
        Mockito.when((Object)this.activeTaskCreator.createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(activeTask));
        Mockito.when((Object)this.standbyTaskCreator.createStandbyTaskFromActive((StreamTask)ArgumentMatchers.any(), (Set)ArgumentMatchers.eq(this.taskId00Partitions))).thenReturn((Object)standbyTask);
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator)).createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(Collections.emptyMap()));
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator, (VerificationMode)Mockito.times((int)2))).createTasks(Collections.emptyMap());
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldConvertStandbyTaskToActiveTask() {
        StandbyTask standbyTask = (StandbyTask)Mockito.mock(StandbyTask.class);
        Mockito.when((Object)standbyTask.id()).thenReturn((Object)this.taskId00);
        Mockito.when((Object)standbyTask.isActive()).thenReturn((Object)false);
        Mockito.when((Object)standbyTask.prepareCommit()).thenReturn(Collections.emptyMap());
        StreamTask activeTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)activeTask.id()).thenReturn((Object)this.taskId00);
        Mockito.when((Object)activeTask.inputPartitions()).thenReturn(this.taskId00Partitions);
        Mockito.when((Object)this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(standbyTask));
        Mockito.when((Object)this.activeTaskCreator.createActiveTaskFromStandby((StandbyTask)ArgumentMatchers.eq((Object)standbyTask), (Set)ArgumentMatchers.eq(this.taskId00Partitions), (Consumer)ArgumentMatchers.any())).thenReturn((Object)activeTask);
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        ((ActiveTaskCreator)Mockito.verify((Object)this.activeTaskCreator, (VerificationMode)Mockito.times((int)2))).createTasks((Consumer)ArgumentMatchers.any(), (Map)ArgumentMatchers.eq(Collections.emptyMap()));
        ((StandbyTaskCreator)Mockito.verify((Object)this.standbyTaskCreator)).createTasks(Collections.emptyMap());
        Mockito.verifyNoInteractions((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldListNotPausedTasks() {
        this.handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        Assert.assertEquals((long)this.taskManager.notPausedTasks().size(), (long)2L);
        this.topologyMetadata.pauseTopology("__UNNAMED_TOPOLOGY__");
        Assert.assertEquals((long)this.taskManager.notPausedTasks().size(), (long)0L);
    }

    private static KafkaFutureImpl<DeletedRecords> completedFuture() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        futureDeletedRecords.complete(null);
        return futureDeletedRecords;
    }

    private void makeTaskFolders(String ... names) throws Exception {
        ArrayList<StateDirectory.TaskDirectory> taskFolders = new ArrayList<StateDirectory.TaskDirectory>(names.length);
        for (int i = 0; i < names.length; ++i) {
            taskFolders.add(new StateDirectory.TaskDirectory(this.testFolder.newFolder(names[i]), null));
        }
        Mockito.when((Object)this.stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders);
    }

    private void writeCheckpointFile(TaskId task, Map<TopicPartition, Long> offsets) throws Exception {
        File checkpointFile = this.getCheckpointFile(task);
        Files.createFile(checkpointFile.toPath(), new FileAttribute[0]);
        new OffsetCheckpoint(checkpointFile).write(offsets);
        Mockito.lenient().when((Object)this.stateDirectory.checkpointFileFor(task)).thenReturn((Object)checkpointFile);
        this.expectDirectoryNotEmpty(task);
    }

    private File getCheckpointFile(TaskId task) {
        return new File(new File(this.testFolder.getRoot(), task.toString()), ".checkpoint");
    }

    private static ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long offset) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, null, null);
    }

    private static class StateMachineTask
    extends AbstractTask
    implements Task {
        private final boolean active;
        private boolean commitNeeded = false;
        private boolean commitRequested = false;
        private boolean commitPrepared = false;
        private boolean commitCompleted = false;
        private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
        private Map<TopicPartition, Long> purgeableOffsets;
        private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
        private Set<TopicPartition> partitionsForOffsetReset = Collections.emptySet();
        private Long timeout = null;
        private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>>();

        StateMachineTask(TaskId id, Set<TopicPartition> partitions, boolean active, ProcessorStateManager processorStateManager) {
            super(id, null, null, processorStateManager, partitions, new TopologyConfig((StreamsConfig)new DummyStreamsConfig()).getTaskConfig(), "test-task", StateMachineTask.class);
            this.active = active;
        }

        public void initializeIfNeeded() {
            if (this.state() == Task.State.CREATED) {
                this.transitionTo(Task.State.RESTORING);
                if (!this.active) {
                    this.transitionTo(Task.State.RUNNING);
                }
            }
        }

        public void addPartitionsForOffsetReset(Set<TopicPartition> partitionsForOffsetReset) {
            this.partitionsForOffsetReset = partitionsForOffsetReset;
        }

        public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
            if (this.state() == Task.State.RUNNING) {
                return;
            }
            this.transitionTo(Task.State.RUNNING);
        }

        public void setCommitNeeded() {
            this.commitNeeded = true;
        }

        public boolean commitNeeded() {
            return this.commitNeeded;
        }

        public void setCommitRequested() {
            this.commitRequested = true;
        }

        public boolean commitRequested() {
            return this.commitRequested;
        }

        public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
            this.commitPrepared = true;
            if (this.commitNeeded) {
                return this.committableOffsets;
            }
            return Collections.emptyMap();
        }

        public void postCommit(boolean enforceCheckpoint) {
            this.commitNeeded = false;
            this.commitCompleted = true;
        }

        public void suspend() {
            if (this.state() == Task.State.CLOSED) {
                throw new IllegalStateException("Illegal state " + this.state() + " while suspending active task " + this.id);
            }
            if (this.state() != Task.State.SUSPENDED) {
                this.transitionTo(Task.State.SUSPENDED);
            }
        }

        public void resume() {
            if (this.state() == Task.State.SUSPENDED) {
                this.transitionTo(Task.State.RUNNING);
            }
        }

        public void revive() {
            this.commitNeeded = false;
            this.commitRequested = false;
            super.revive();
        }

        public void maybeInitTaskTimeoutOrThrow(long currentWallClockMs, Exception cause) {
            this.timeout = currentWallClockMs;
        }

        public void clearTaskTimeout() {
            this.timeout = null;
        }

        public void recordRestoration(Time time, long numRecords, boolean initRemaining) {
        }

        public void closeClean() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void closeDirty() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void prepareRecycle() {
            this.transitionTo(Task.State.CLOSED);
        }

        public void resumePollingForPartitionsWithAvailableSpace() {
        }

        public void updateLags() {
        }

        public void updateInputPartitions(Set<TopicPartition> topicPartitions, Map<String, List<String>> allTopologyNodesToSourceTopics) {
            this.inputPartitions = topicPartitions;
        }

        void setCommittableOffsetsAndMetadata(Map<TopicPartition, OffsetAndMetadata> committableOffsets) {
            if (!this.active) {
                throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
            }
            this.committableOffsets = committableOffsets;
        }

        public StateStore getStore(String name) {
            return null;
        }

        public Set<TopicPartition> changelogPartitions() {
            return this.changelogOffsets.keySet();
        }

        public boolean isActive() {
            return this.active;
        }

        void setPurgeableOffsets(Map<TopicPartition, Long> purgeableOffsets) {
            this.purgeableOffsets = purgeableOffsets;
        }

        public Map<TopicPartition, Long> purgeableOffsets() {
            return this.purgeableOffsets;
        }

        void setChangelogOffsets(Map<TopicPartition, Long> changelogOffsets) {
            this.changelogOffsets = changelogOffsets;
        }

        public Map<TopicPartition, Long> changelogOffsets() {
            return this.changelogOffsets;
        }

        public Map<TopicPartition, Long> committedOffsets() {
            return Collections.emptyMap();
        }

        public Map<TopicPartition, Long> highWaterMark() {
            return Collections.emptyMap();
        }

        public Optional<Long> timeCurrentIdlingStarted() {
            return Optional.empty();
        }

        public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
            if (this.isActive()) {
                Deque partitionQueue = this.queue.computeIfAbsent(partition, k -> new LinkedList());
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    partitionQueue.add(record);
                }
            } else {
                throw new IllegalStateException("Can't add records to an inactive task.");
            }
        }

        public boolean process(long wallClockTime) {
            if (this.isActive() && this.state() == Task.State.RUNNING) {
                for (LinkedList<ConsumerRecord<byte[], byte[]>> records : this.queue.values()) {
                    ConsumerRecord<byte[], byte[]> record = records.poll();
                    if (record == null) continue;
                    return true;
                }
                return false;
            }
            throw new IllegalStateException("Can't process an inactive or non-running task.");
        }
    }
}

