/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.DefaultStateUpdater;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class DefaultStateUpdaterTest {
    private static final int COMMIT_INTERVAL = 100;
    private static final long CALL_TIMEOUT = 1000L;
    private static final long VERIFICATION_TIMEOUT = 30000L;
    private static final TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
    private static final TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
    private static final TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
    private static final TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
    private static final TopicPartition TOPIC_PARTITION_C_0 = new TopicPartition("topicC", 0);
    private static final TopicPartition TOPIC_PARTITION_D_0 = new TopicPartition("topicD", 0);
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);
    private static final TaskId TASK_1_0 = new TaskId(1, 0);
    private static final TaskId TASK_1_1 = new TaskId(1, 1);
    private static final TaskId TASK_A_0_0 = new TaskId(0, 0, "A");
    private static final TaskId TASK_A_0_1 = new TaskId(0, 1, "A");
    private static final TaskId TASK_B_0_0 = new TaskId(0, 0, "B");
    private static final TaskId TASK_B_0_1 = new TaskId(0, 1, "B");
    private final Time time = new MockTime(1L);
    private final Metrics metrics = new Metrics(this.time);
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps(100));
    private final ChangelogReader changelogReader = (ChangelogReader)Mockito.mock(ChangelogReader.class);
    private final TopologyMetadata topologyMetadata = StreamsTestUtils.TopologyMetadataBuilder.unnamedTopology().build();
    private DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", this.metrics, this.config, this.changelogReader, this.topologyMetadata, this.time);

    DefaultStateUpdaterTest() {
    }

    @AfterEach
    public void tearDown() {
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
    }

    private Properties configProps(int commitInterval) {
        return Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_v2"), Utils.mkEntry((Object)"commit.interval.ms", (Object)commitInterval), Utils.mkEntry((Object)StreamsConfig.producerPrefix((String)"transaction.timeout.ms"), (Object)commitInterval)}));
    }

    @Test
    public void shouldShutdownStateUpdater() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).clear();
    }

    @Test
    public void shouldShutdownStateUpdaterAndRestart() {
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.stateUpdater.start();
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)2))).clear();
    }

    @Test
    public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
        StreamTask statelessTask = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        StreamTask statefulTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        this.stateUpdater.add((Task)statelessTask);
        this.stateUpdater.add((Task)statefulTask);
        this.stateUpdater.remove(TASK_1_1);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyRemovedTasks(new Task[0]);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.verifyRemovedTasks(new Task[]{statelessTask, statefulTask, standbyTask});
    }

    @Test
    public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
        this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
        this.stateUpdater = new DefaultStateUpdater("test-state-updater", this.metrics, new StreamsConfig((Map)this.configProps(Integer.MAX_VALUE)), this.changelogReader, this.topologyMetadata, this.time);
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyUpdatingTasks(new Task[]{activeTask, standbyTask});
        this.verifyRemovedTasks(new Task[0]);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.verifyRemovedTasks(new Task[]{activeTask, standbyTask});
        ((StreamTask)Mockito.verify((Object)activeTask)).maybeCheckpoint(true);
        ((StandbyTask)Mockito.verify((Object)standbyTask)).maybeCheckpoint(true);
    }

    @Test
    public void shouldRemovePausedTasksOnShutdown() throws Exception {
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyUpdatingTasks(new Task[]{activeTask, standbyTask});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[]{activeTask, standbyTask});
        this.verifyRemovedTasks(new Task[0]);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.verifyRemovedTasks(new Task[]{activeTask, standbyTask});
    }

    @Test
    public void shouldRemovePausedAndUpdatingTasksOnShutdown() throws Exception {
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_A_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.topologyMetadata.isPaused(standbyTask.id().topologyName())).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyPausedTasks(new Task[]{standbyTask});
        this.verifyUpdatingTasks(new Task[]{activeTask});
        this.verifyRemovedTasks(new Task[0]);
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        this.verifyRemovedTasks(new Task[]{activeTask, standbyTask});
    }

    @Test
    public void shouldThrowIfStatelessTaskNotInStateRestoring() {
        this.shouldThrowIfActiveTaskNotInStateRestoring(StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).build());
    }

    @Test
    public void shouldThrowIfStatefulTaskNotInStateRestoring() {
        this.shouldThrowIfActiveTaskNotInStateRestoring(StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).build());
    }

    private void shouldThrowIfActiveTaskNotInStateRestoring(StreamTask task) {
        this.shouldThrowIfTaskNotInGivenState((Task)task, Task.State.RESTORING);
    }

    @Test
    public void shouldThrowIfStandbyTaskNotInStateRunning() {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).build();
        this.shouldThrowIfTaskNotInGivenState((Task)task, Task.State.RUNNING);
    }

    private void shouldThrowIfTaskNotInGivenState(Task task, Task.State correctState) {
        for (Task.State state : Task.State.values()) {
            if (state == correctState) continue;
            Mockito.when((Object)task.state()).thenReturn((Object)state);
            Assertions.assertThrows(IllegalStateException.class, () -> this.stateUpdater.add(task));
        }
    }

    @Test
    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldThrowIfAddingTasksWithSameId((Task)task1, (Task)task2);
    }

    @Test
    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
        StandbyTask task1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        this.shouldThrowIfAddingTasksWithSameId((Task)task1, (Task)task2);
    }

    @Test
    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        this.shouldThrowIfAddingTasksWithSameId((Task)task1, (Task)task2);
    }

    @Test
    public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        this.shouldThrowIfAddingTasksWithSameId((Task)task2, (Task)task1);
    }

    private void shouldThrowIfAddingTasksWithSameId(Task task1, Task task2) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task1);
        this.stateUpdater.add(task2);
        this.verifyFailedTasks(IllegalStateException.class, task1);
    }

    @Test
    public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        this.shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
    }

    @Test
    public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_2).inState(Task.State.RESTORING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_0).inState(Task.State.RESTORING).build();
        this.shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
    }

    private void shouldImmediatelyAddStatelessTasksToRestoredTasks(StreamTask ... tasks) throws Exception {
        this.stateUpdater.start();
        for (StreamTask task : tasks) {
            this.stateUpdater.add((Task)task);
        }
        this.verifyRestoredActiveTasks(tasks);
        this.verifyNeverCheckpointTasks((Task[])tasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldRestoreSingleActiveStatefulTask() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(task);
        this.verifyCheckpointTasks(true, new Task[]{task});
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task.changelogPartitions(), task.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task.changelogPartitions());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)3))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.verifyRestoredActiveTasks(task3, task1, task2);
        this.verifyCheckpointTasks(true, new Task[]{task3, task1, task2});
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task1.changelogPartitions(), task1.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task2.changelogPartitions(), task2.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task3.changelogPartitions(), task3.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task1.changelogPartitions());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task2.changelogPartitions());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task3.changelogPartitions());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)3))).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)4))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskAdded() {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        this.stateUpdater.add((Task)task);
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskUpdating() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[]{task});
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskRestored() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskRemoved() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.remove(task.id());
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[]{task});
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskFailed() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{task.id()}));
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{taskCorruptedException}).when((Object)this.changelogReader)).restore(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)TASK_0_0, (Object)task)}));
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task}), (RuntimeException)taskCorruptedException));
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfTaskPaused() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyUpdatingTasks(new Task[]{task});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[]{task});
        Assertions.assertTrue((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnFalseForRestoreActiveTasksIfTaskRemovedFromStateUpdater() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(30000L));
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertFalse((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldReturnTrueForRestoreActiveTasksIfStandbyTask() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[]{task});
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        Assertions.assertFalse((boolean)this.stateUpdater.restoresActiveTasks());
    }

    @Test
    public void shouldDrainRestoredActiveTasks() throws Exception {
        Assertions.assertTrue((boolean)this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.verifyDrainingRestoredActiveTasks(task1);
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_1).inState(Task.State.RESTORING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_1_0).inState(Task.State.RESTORING).build();
        StreamTask task4 = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_2).inState(Task.State.RESTORING).build();
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyDrainingRestoredActiveTasks(task2, task3, task4);
    }

    @Test
    public void shouldUpdateSingleStandbyTask() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        this.shouldUpdateStandbyTasks(task);
    }

    @Test
    public void shouldUpdateMultipleStandbyTasks() throws Exception {
        StandbyTask task1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        StandbyTask task3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        this.shouldUpdateStandbyTasks(task1, task2, task3);
    }

    private void shouldUpdateStandbyTasks(StandbyTask ... tasks) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        for (StandbyTask task : tasks) {
            this.stateUpdater.add((Task)task);
        }
        this.verifyUpdatingStandbyTasks(tasks);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        for (StandbyTask task : tasks) {
            ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task.changelogPartitions(), task.stateManager());
        }
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).transitToUpdateStandby();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.timeout((long)30000L).atLeast(1))).restore(ArgumentMatchers.anyMap());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).enforceRestoreActive();
    }

    @Test
    public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask task3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StandbyTask task4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyRestoredActiveTasks(task2, task1);
        this.verifyCheckpointTasks(true, new Task[]{task2, task1});
        this.verifyUpdatingStandbyTasks(task4, task3);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task1.changelogPartitions(), task1.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task2.changelogPartitions(), task2.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task3.changelogPartitions(), task3.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).register(task4.changelogPartitions(), task4.stateManager());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.atLeast((int)3))).restore(ArgumentMatchers.anyMap());
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader, task1, task2});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)2))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyRestoredActiveTasks(task1);
        this.verifyCheckpointTasks(true, new Task[]{task1});
        this.verifyUpdatingStandbyTasks(task2);
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
        this.stateUpdater.add((Task)task3);
        this.verifyRestoredActiveTasks(task1, task3);
        this.verifyCheckpointTasks(true, new Task[]{task3});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{activeTask1.id(), activeTask2.id()}));
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)activeTask2.id(), (Object)activeTask2), Utils.mkEntry((Object)standbyTask.id(), (Object)standbyTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{taskCorruptedException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks1);
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{activeTask1, activeTask2}), (RuntimeException)taskCorruptedException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.atLeast((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldNotTransitToStandbyAgainAfterStandbyTaskFailed() throws Exception {
        StandbyTask task1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2)});
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{task1.id()}));
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1}), (RuntimeException)taskCorruptedException);
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{taskCorruptedException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyUpdatingTasks(new Task[]{activeTask1, activeTask2, standbyTask});
        this.stateUpdater.remove(activeTask1.id());
        this.stateUpdater.remove(activeTask2.id());
        this.verifyRemovedTasks(new Task[]{activeTask1, activeTask2});
        InOrder orderVerifier = Mockito.inOrder((Object[])new Object[]{this.changelogReader});
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader, Mockito.atLeast((int)1))).enforceRestoreActive();
        ((ChangelogReader)orderVerifier.verify((Object)this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldNotSwitchTwiceToUpdatingStandbyTaskIfStandbyTaskIsRemoved() throws Exception {
        StandbyTask standbyTask1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)standbyTask2);
        this.verifyUpdatingTasks(new Task[]{standbyTask1, standbyTask2});
        this.stateUpdater.remove(standbyTask2.id());
        this.verifyRemovedTasks(new Task[]{standbyTask2});
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).transitToUpdateStandby();
    }

    @Test
    public void shouldRemoveActiveStatefulTask() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldRemoveStatefulTask((Task)task);
    }

    @Test
    public void shouldRemoveStandbyTask() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldRemoveStatefulTask((Task)task);
    }

    private void shouldRemoveStatefulTask(Task task) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.remove(task.id());
        this.verifyRemovedTasks(task);
        this.verifyCheckpointTasks(true, task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task.changelogPartitions());
    }

    @Test
    public void shouldRemovePausedTask() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyUpdatingTasks(new Task[]{task1, task2});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[]{task1, task2});
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.stateUpdater.remove(task1.id());
        this.stateUpdater.remove(task2.id());
        this.verifyRemovedTasks(new Task[]{task1, task2});
        this.verifyPausedTasks(new Task[0]);
        this.verifyCheckpointTasks(true, new Task[]{task1, task2});
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task1.changelogPartitions());
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader)).unregister((Collection)task2.changelogPartitions());
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotRemoveTaskFromRestoredActiveTasks(task);
    }

    @Test
    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statelessTask(TASK_0_0).inState(Task.State.RESTORING).build();
        this.shouldNotRemoveTaskFromRestoredActiveTasks(task);
    }

    private void shouldNotRemoveTaskFromRestoredActiveTasks(StreamTask task) throws Exception {
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.add((Task)controlTask);
        this.verifyRestoredActiveTasks(task);
        this.stateUpdater.remove(task.id());
        this.stateUpdater.remove(controlTask.id());
        this.verifyRemovedTasks(new Task[]{controlTask});
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotRemoveTaskFromFailedTasks((Task)task);
    }

    @Test
    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldNotRemoveTaskFromFailedTasks((Task)task);
    }

    private void shouldNotRemoveTaskFromFailedTasks(Task task) throws Exception {
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task.id(), (Object)task), Utils.mkEntry((Object)controlTask.id(), (Object)controlTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add((Task)controlTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task}), (RuntimeException)streamsException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.stateUpdater.remove(task.id());
        this.stateUpdater.remove(controlTask.id());
        this.verifyRemovedTasks(new Task[]{controlTask});
        this.verifyPausedTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldPauseActiveStatefulTask() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldPauseStatefulTask((Task)task);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseStandbyTask() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldPauseStatefulTask((Task)task);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_A_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.topologyMetadata.isPaused(task1.id().topologyName())).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyPausedTasks(new Task[]{task1});
        this.verifyCheckpointTasks(true, new Task[]{task1});
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[]{task2});
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).enforceRestoreActive();
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)1))).transitToUpdateStandby();
    }

    @Test
    public void shouldPauseStandbyTaskAndNotTransitToRestoreActive() throws Exception {
        StandbyTask task1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_A_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.topologyMetadata.isPaused(task1.id().topologyName())).thenReturn((Object)false).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.verifyPausedTasks(new Task[]{task1});
        this.verifyUpdatingTasks(new Task[]{task2});
        this.verifyCheckpointTasks(true, new Task[]{task1});
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.never())).enforceRestoreActive();
    }

    private void shouldPauseStatefulTask(Task task) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.verifyUpdatingTasks(task);
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(task);
        this.verifyCheckpointTasks(true, task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotPausingNonExistTasks() throws Exception {
        this.stateUpdater.start();
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.add((Task)controlTask);
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[]{controlTask});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[]{controlTask});
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotPauseTaskInFailedTasks((Task)task);
    }

    @Test
    public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldNotPauseTaskInFailedTasks((Task)task);
    }

    private void shouldNotPauseTaskInFailedTasks(Task task) throws Exception {
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task.id(), (Object)task), Utils.mkEntry((Object)controlTask.id(), (Object)controlTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add((Task)controlTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task}), (RuntimeException)streamsException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[]{controlTask});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[]{controlTask});
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotPauseTaskInRemovedTasks((Task)task);
    }

    @Test
    public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldNotPauseTaskInRemovedTasks((Task)task);
    }

    private void shouldNotPauseTaskInRemovedTasks(Task task) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.remove(task.id());
        this.verifyRemovedTasks(task);
        this.verifyCheckpointTasks(true, task);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyRemovedTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldResumeActiveStatefulTask() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldResumeStatefulTask((Task)task);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)2))).enforceRestoreActive();
    }

    @Test
    public void shouldResumeStandbyTask() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldResumeStatefulTask((Task)task);
        ((ChangelogReader)Mockito.verify((Object)this.changelogReader, (VerificationMode)Mockito.times((int)2))).transitToUpdateStandby();
    }

    private void shouldResumeStatefulTask(Task task) throws Exception {
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.verifyUpdatingTasks(task);
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)false);
        this.stateUpdater.signalResume();
        this.verifyPausedTasks(new Task[0]);
        this.verifyUpdatingTasks(task);
    }

    @Test
    public void shouldNotResumeNonExistingTasks() throws Exception {
        this.stateUpdater.start();
        this.verifyPausedTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task);
        this.stateUpdater.add((Task)controlTask);
        this.verifyRestoredActiveTasks(task);
        this.verifyPausedTasks(new Task[0]);
        this.verifyRestoredActiveTasks(task);
        this.verifyUpdatingTasks(new Task[]{controlTask});
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
    }

    @Test
    public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotPauseTaskInRemovedTasks((Task)task);
    }

    @Test
    public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldNotResumeTaskInRemovedTasks((Task)task);
    }

    private void shouldNotResumeTaskInRemovedTasks(Task task) throws Exception {
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.verifyUpdatingTasks(task);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.stateUpdater.remove(task.id());
        this.verifyRemovedTasks(task);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
    }

    @Test
    public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
        StreamTask task = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        this.shouldNotPauseTaskInFailedTasks((Task)task);
    }

    @Test
    public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
        StandbyTask task = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.shouldNotResumeTaskInFailedTasks((Task)task);
    }

    private void shouldNotResumeTaskInFailedTasks(Task task) throws Exception {
        StreamTask controlTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StreamsException streamsException = new StreamsException("Something happened", task.id());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task.id(), (Object)task), Utils.mkEntry((Object)controlTask.id(), (Object)controlTask)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add(task);
        this.stateUpdater.add((Task)controlTask);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task}), (RuntimeException)streamsException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[]{controlTask});
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[]{controlTask});
    }

    @Test
    public void shouldDrainRemovedTasks() throws Exception {
        Assertions.assertFalse((boolean)this.stateUpdater.hasRemovedTasks());
        Assertions.assertTrue((boolean)this.stateUpdater.drainRemovedTasks().isEmpty());
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.remove(task1.id());
        this.verifyDrainingRemovedTasks(new Task[]{task1});
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RESTORING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task4 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RESTORING).build();
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.remove(task2.id());
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.remove(task3.id());
        this.stateUpdater.add((Task)task4);
        this.stateUpdater.remove(task4.id());
        this.verifyDrainingRemovedTasks(new Task[]{task2, task3, task4});
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException = new StreamsException("The Streams were crossed!");
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2)});
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{streamsException}).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)streamsException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask task3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException1 = new StreamsException("The Streams were crossed!", task1.id());
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", task3.id());
        Map updatingTasksBeforeFirstThrow = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        Map updatingTasksBeforeSecondThrow = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{streamsException1}).when((Object)this.changelogReader)).restore(updatingTasksBeforeFirstThrow);
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{streamsException2}).when((Object)this.changelogReader)).restore(updatingTasksBeforeSecondThrow);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1}), (RuntimeException)streamsException1);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task3}), (RuntimeException)streamsException2);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
        this.verifyUpdatingTasks(new Task[]{task2});
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RESTORING).build();
        Set expectedTaskIds = Utils.mkSet((Object[])new TaskId[]{task1.id(), task2.id()});
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3)});
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{taskCorruptedException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)taskCorruptedException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[]{task3});
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
    }

    @Test
    public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask task2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RUNNING).build();
        IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
        Map updatingTasks = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1), Utils.mkEntry((Object)task2.id(), (Object)task2)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{illegalStateException}).when((Object)this.changelogReader)).restore(updatingTasks);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1, task2}), (RuntimeException)illegalStateException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
        this.verifyUpdatingTasks(new Task[0]);
        this.verifyRestoredActiveTasks(new StreamTask[0]);
        this.verifyRemovedTasks(new Task[0]);
        this.verifyPausedTasks(new Task[0]);
    }

    @Test
    public void shouldDrainFailedTasksAndExceptions() throws Exception {
        Assertions.assertFalse((boolean)this.stateUpdater.hasExceptionsAndFailedTasks());
        Assertions.assertTrue((boolean)this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RESTORING).build();
        StreamTask task3 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task4 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RESTORING).build();
        String exceptionMessage = "The Streams were crossed!";
        StreamsException streamsException1 = new StreamsException("The Streams were crossed!", task1.id());
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task1.id(), (Object)task1)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException1}).when((Object)this.changelogReader)).restore(updatingTasks1);
        StreamsException streamsException2 = new StreamsException("The Streams were crossed!", task2.id());
        StreamsException streamsException3 = new StreamsException("The Streams were crossed!", task3.id());
        StreamsException streamsException4 = new StreamsException("The Streams were crossed!", task4.id());
        Map updatingTasks2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task2.id(), (Object)task2), Utils.mkEntry((Object)task3.id(), (Object)task3), Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException2}).when((Object)this.changelogReader)).restore(updatingTasks2);
        Map updatingTasks3 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task3.id(), (Object)task3), Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException3}).when((Object)this.changelogReader)).restore(updatingTasks3);
        Map updatingTasks4 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task4.id(), (Object)task4)});
        ((ChangelogReader)Mockito.doThrow((Throwable[])new Throwable[]{streamsException4}).when((Object)this.changelogReader)).restore(updatingTasks4);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task1}), (RuntimeException)streamsException1);
        this.verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task2}), (RuntimeException)streamsException2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks3 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task3}), (RuntimeException)streamsException3);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks4 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{task4}), (RuntimeException)streamsException4);
        this.verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
    }

    @Test
    public void shouldAutoCheckpointTasksOnInterval() throws Exception {
        StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask task3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StandbyTask task4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)task1);
        this.stateUpdater.add((Task)task2);
        this.stateUpdater.add((Task)task3);
        this.stateUpdater.add((Task)task4);
        this.verifyUpdatingTasks(new Task[]{task1, task2, task3, task4});
        this.time.sleep(101L);
        this.verifyExceptionsAndFailedTasks(new StateUpdater.ExceptionAndTasks[0]);
        this.verifyCheckpointTasks(false, new Task[]{task1, task2, task3, task4});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
        MockTime time = new MockTime();
        DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", this.metrics, this.config, this.changelogReader, this.topologyMetadata, (Time)time);
        try {
            StreamTask task1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
            StreamTask task2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
            StandbyTask task3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
            StandbyTask task4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
            Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
            Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
            stateUpdater.start();
            stateUpdater.add((Task)task1);
            stateUpdater.add((Task)task2);
            stateUpdater.add((Task)task3);
            stateUpdater.add((Task)task4);
            time.sleep(100L);
            this.verifyNeverCheckpointTasks(new Task[]{task1, task2, task3, task4});
        }
        finally {
            stateUpdater.shutdown(Duration.ofMinutes(1L));
        }
    }

    private void verifyCheckpointTasks(boolean enforceCheckpoint, Task ... tasks) {
        for (Task task : tasks) {
            ((Task)Mockito.verify((Object)task, (VerificationMode)Mockito.timeout((long)30000L).atLeast(1))).maybeCheckpoint(enforceCheckpoint);
        }
    }

    private void verifyNeverCheckpointTasks(Task ... tasks) {
        for (Task task : tasks) {
            ((Task)Mockito.verify((Object)task, (VerificationMode)Mockito.never())).maybeCheckpoint(ArgumentMatchers.anyBoolean());
        }
    }

    @Test
    public void shouldGetTasksFromInputQueue() {
        this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.remove(TASK_0_0);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask3);
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2, standbyTask3}));
    }

    @Test
    public void shouldGetTasksFromUpdatingTasks() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_C_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask3);
        this.verifyUpdatingTasks(new Task[]{activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3});
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2, standbyTask3}));
    }

    @Test
    public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn((Object)Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0}));
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.verifyRestoredActiveTasks(activeTask1, activeTask2);
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1, activeTask2}), Utils.mkSet((Object[])new StandbyTask[0]));
        this.stateUpdater.drainRestoredActiveTasks(Duration.ofMinutes(1L));
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        TaskCorruptedException taskCorruptedException = new TaskCorruptedException(Utils.mkSet((Object[])new TaskId[]{standbyTask1.id(), standbyTask2.id()}));
        StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
        Map updatingTasks1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)standbyTask1.id(), (Object)standbyTask1), Utils.mkEntry((Object)standbyTask2.id(), (Object)standbyTask2)});
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{taskCorruptedException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks1);
        Map updatingTasks2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1)});
        ((ChangelogReader)Mockito.doReturn((Object)0L).doThrow(new Throwable[]{streamsException}).doReturn((Object)0L).when((Object)this.changelogReader)).restore(updatingTasks2);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)standbyTask2);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{standbyTask1, standbyTask2}), (RuntimeException)taskCorruptedException);
        StateUpdater.ExceptionAndTasks expectedExceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Utils.mkSet((Object[])new Task[]{activeTask1}), (RuntimeException)streamsException);
        this.verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask1}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2}));
        this.stateUpdater.drainExceptionsAndFailedTasks();
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromRemovedTasks() throws Exception {
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_1_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask2 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_1_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_D_0})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask1 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)standbyTask1);
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask2);
        this.stateUpdater.remove(standbyTask1.id());
        this.stateUpdater.remove(standbyTask2.id());
        this.stateUpdater.remove(activeTask.id());
        this.verifyRemovedTasks(new Task[]{activeTask, standbyTask1, standbyTask2});
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask1, standbyTask2}));
        this.stateUpdater.drainRemovedTasks();
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[0]), Utils.mkSet((Object[])new StandbyTask[0]));
    }

    @Test
    public void shouldGetTasksFromPausedTasks() throws Exception {
        StreamTask activeTask = StreamsTestUtils.TaskBuilder.statefulTask(TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask = StreamsTestUtils.TaskBuilder.standbyTask(TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RUNNING).build();
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask);
        this.stateUpdater.add((Task)standbyTask);
        this.verifyUpdatingTasks(new Task[]{activeTask, standbyTask});
        Mockito.when((Object)this.topologyMetadata.isPaused(null)).thenReturn((Object)true);
        this.verifyPausedTasks(new Task[]{activeTask, standbyTask});
        this.verifyGetTasks(Utils.mkSet((Object[])new StreamTask[]{activeTask}), Utils.mkSet((Object[])new StandbyTask[]{standbyTask}));
    }

    @Test
    public void shouldRecordMetrics() throws Exception {
        StreamTask activeTask1 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_A_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_0})).inState(Task.State.RESTORING).build();
        StreamTask activeTask2 = StreamsTestUtils.TaskBuilder.statefulTask(TASK_B_0_0, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_0})).inState(Task.State.RESTORING).build();
        StandbyTask standbyTask3 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_A_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_A_1})).inState(Task.State.RUNNING).build();
        StandbyTask standbyTask4 = StreamsTestUtils.TaskBuilder.standbyTask(TASK_B_0_1, Utils.mkSet((Object[])new TopicPartition[]{TOPIC_PARTITION_B_1})).inState(Task.State.RUNNING).build();
        Map tasks1234 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)activeTask2.id(), (Object)activeTask2), Utils.mkEntry((Object)standbyTask3.id(), (Object)standbyTask3), Utils.mkEntry((Object)standbyTask4.id(), (Object)standbyTask4)});
        Map tasks13 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)activeTask1.id(), (Object)activeTask1), Utils.mkEntry((Object)standbyTask3.id(), (Object)standbyTask3)});
        Mockito.when((Object)this.topologyMetadata.isPaused(activeTask2.id().topologyName())).thenReturn((Object)true);
        Mockito.when((Object)this.topologyMetadata.isPaused(standbyTask4.id().topologyName())).thenReturn((Object)true);
        Mockito.when((Object)this.changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.changelogReader.allChangelogsCompleted()).thenReturn((Object)false);
        Mockito.when((Object)this.changelogReader.restore(tasks1234)).thenReturn((Object)1L);
        Mockito.when((Object)this.changelogReader.restore(tasks13)).thenReturn((Object)1L);
        Mockito.when((Object)this.changelogReader.isRestoringActive()).thenReturn((Object)true);
        this.stateUpdater.start();
        this.stateUpdater.add((Task)activeTask1);
        this.stateUpdater.add((Task)activeTask2);
        this.stateUpdater.add((Task)standbyTask3);
        this.stateUpdater.add((Task)standbyTask4);
        this.verifyPausedTasks(new Task[]{activeTask2, standbyTask4});
        MatcherAssert.assertThat((Object)this.metrics.metrics().size(), (Matcher)Matchers.is((Object)11));
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put("thread-id", "test-state-updater");
        MetricName metricName = new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "The number of active tasks currently undergoing restoration", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.is((Object)1.0));
        metricName = new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "The number of standby tasks currently undergoing state update", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.is((Object)1.0));
        metricName = new MetricName("active-paused-tasks", "stream-state-updater-metrics", "The number of active tasks paused restoring", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.is((Object)1.0));
        metricName = new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "The number of standby tasks paused state update", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.is((Object)1.0));
        metricName = new MetricName("idle-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on being idle", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.greaterThanOrEqualTo((Comparable)Double.valueOf(0.0)));
        metricName = new MetricName("active-restore-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on restoring active tasks", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.greaterThanOrEqualTo((Comparable)Double.valueOf(0.0)));
        metricName = new MetricName("standby-update-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on updating standby tasks", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.is((Object)0.0));
        metricName = new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "The fraction of time the thread spent on checkpointing tasks restored progress", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.greaterThanOrEqualTo((Comparable)Double.valueOf(0.0)));
        metricName = new MetricName("restore-records-rate", "stream-state-updater-metrics", "The average per-second number of records restored", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.not((Object)0.0));
        metricName = new MetricName("restore-call-rate", "stream-state-updater-metrics", "The average per-second number of restore calls triggered", tagMap);
        DefaultStateUpdaterTest.verifyMetric(this.metrics, metricName, Matchers.not((Object)0.0));
        this.stateUpdater.shutdown(Duration.ofMinutes(1L));
        MatcherAssert.assertThat((Object)this.metrics.metrics().size(), (Matcher)Matchers.is((Object)1));
    }

    private static <T> void verifyMetric(Metrics metrics, MetricName metricName, Matcher<T> matcher) {
        MatcherAssert.assertThat((Object)((KafkaMetric)metrics.metrics().get(metricName)).metricName().description(), (Matcher)Matchers.is((Object)metricName.description()));
        MatcherAssert.assertThat((Object)((KafkaMetric)metrics.metrics().get(metricName)).metricName().tags(), (Matcher)Matchers.is((Object)metricName.tags()));
        MatcherAssert.assertThat((Object)((KafkaMetric)metrics.metrics().get(metricName)).metricValue(), matcher);
    }

    private void verifyGetTasks(Set<StreamTask> expectedActiveTasks, Set<StandbyTask> expectedStandbyTasks) {
        Set tasks = this.stateUpdater.getTasks();
        Assertions.assertEquals((int)(expectedActiveTasks.size() + expectedStandbyTasks.size()), (int)tasks.size());
        tasks.forEach(task -> Assertions.assertTrue((boolean)(task instanceof ReadOnlyTask)));
        Set actualTaskIds = tasks.stream().map(Task::id).collect(Collectors.toSet());
        HashSet<StreamTask> expectedTasks = new HashSet<StreamTask>(expectedActiveTasks);
        expectedTasks.addAll(expectedStandbyTasks);
        Set expectedTaskIds = expectedTasks.stream().map(Task::id).collect(Collectors.toSet());
        Assertions.assertTrue((boolean)actualTaskIds.containsAll(expectedTaskIds));
        Set activeTasks = this.stateUpdater.getActiveTasks();
        Assertions.assertEquals((int)expectedActiveTasks.size(), (int)activeTasks.size());
        Assertions.assertTrue((boolean)activeTasks.containsAll(expectedActiveTasks));
        Set standbyTasks = this.stateUpdater.getStandbyTasks();
        Assertions.assertEquals((int)expectedStandbyTasks.size(), (int)standbyTasks.size());
        Assertions.assertTrue((boolean)standbyTasks.containsAll(expectedStandbyTasks));
    }

    private void verifyRestoredActiveTasks(StreamTask ... tasks) throws Exception {
        if (tasks.length == 0) {
            TestUtils.waitForCondition(() -> this.stateUpdater.getRestoredActiveTasks().isEmpty(), (long)30000L, (String)"Did not get empty restored active task within the given timeout!");
        } else {
            Set expectedRestoredTasks = Utils.mkSet((Object[])tasks);
            HashSet restoredTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                restoredTasks.addAll(this.stateUpdater.getRestoredActiveTasks());
                return restoredTasks.containsAll(expectedRestoredTasks) && restoredTasks.size() == expectedRestoredTasks.size();
            }, (long)30000L, (String)"Did not get all restored active task within the given timeout!");
        }
    }

    private void verifyDrainingRestoredActiveTasks(StreamTask ... tasks) throws Exception {
        Set expectedRestoredTasks = Utils.mkSet((Object[])tasks);
        HashSet restoredTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            restoredTasks.addAll(this.stateUpdater.drainRestoredActiveTasks(Duration.ofMillis(1000L)));
            return restoredTasks.containsAll(expectedRestoredTasks) && restoredTasks.size() == expectedRestoredTasks.size();
        }, (long)30000L, (String)"Did not get all restored active task within the given timeout!");
        Assertions.assertTrue((boolean)this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
    }

    private void verifyUpdatingTasks(Task ... tasks) throws Exception {
        if (tasks.length == 0) {
            TestUtils.waitForCondition(() -> this.stateUpdater.getUpdatingTasks().isEmpty(), (long)30000L, (String)"Did not get empty updating task within the given timeout!");
        } else {
            Set expectedUpdatingTasks = Utils.mkSet((Object[])tasks);
            HashSet updatingTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                updatingTasks.addAll(this.stateUpdater.getUpdatingTasks());
                return updatingTasks.containsAll(expectedUpdatingTasks) && updatingTasks.size() == expectedUpdatingTasks.size();
            }, (long)30000L, (String)"Did not get all updating task within the given timeout!");
        }
    }

    private void verifyUpdatingStandbyTasks(StandbyTask ... tasks) throws Exception {
        Set expectedStandbyTasks = Utils.mkSet((Object[])tasks);
        HashSet standbyTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            standbyTasks.addAll(this.stateUpdater.getUpdatingStandbyTasks());
            return standbyTasks.containsAll(expectedStandbyTasks) && standbyTasks.size() == expectedStandbyTasks.size();
        }, (long)30000L, (String)"Did not see all standby task within the given timeout!");
    }

    private void verifyRemovedTasks(Task ... tasks) throws Exception {
        if (tasks.length == 0) {
            TestUtils.waitForCondition(() -> this.stateUpdater.getRemovedTasks().isEmpty(), (long)30000L, (String)"Did not get empty removed task within the given timeout!");
        } else {
            Set expectedRemovedTasks = Utils.mkSet((Object[])tasks);
            HashSet removedTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                removedTasks.addAll(this.stateUpdater.getRemovedTasks());
                return removedTasks.containsAll(expectedRemovedTasks) && removedTasks.size() == expectedRemovedTasks.size();
            }, (long)30000L, (String)"Did not get all removed task within the given timeout!");
        }
    }

    private void verifyPausedTasks(Task ... tasks) throws Exception {
        if (tasks.length == 0) {
            TestUtils.waitForCondition(() -> this.stateUpdater.getPausedTasks().isEmpty(), (long)30000L, (String)"Did not get empty paused task within the given timeout!");
        } else {
            Set expectedPausedTasks = Utils.mkSet((Object[])tasks);
            HashSet pausedTasks = new HashSet();
            TestUtils.waitForCondition(() -> {
                pausedTasks.addAll(this.stateUpdater.getPausedTasks());
                return pausedTasks.containsAll(expectedPausedTasks) && pausedTasks.size() == expectedPausedTasks.size();
            }, (long)30000L, (String)"Did not get all paused task within the given timeout!");
        }
    }

    private void verifyDrainingRemovedTasks(Task ... tasks) throws Exception {
        Set expectedRemovedTasks = Utils.mkSet((Object[])tasks);
        HashSet removedTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            if (this.stateUpdater.hasRemovedTasks()) {
                Set drainedTasks = this.stateUpdater.drainRemovedTasks();
                Assertions.assertFalse((boolean)drainedTasks.isEmpty());
                removedTasks.addAll(drainedTasks);
            }
            return removedTasks.containsAll(Utils.mkSet((Object[])tasks)) && removedTasks.size() == expectedRemovedTasks.size();
        }, (long)30000L, (String)"Did not get all restored active task within the given timeout!");
        Assertions.assertFalse((boolean)this.stateUpdater.hasRemovedTasks());
        Assertions.assertTrue((boolean)this.stateUpdater.drainRemovedTasks().isEmpty());
    }

    private void verifyExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks ... exceptionsAndTasks) throws Exception {
        List<StateUpdater.ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
        HashSet failedTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            failedTasks.addAll(this.stateUpdater.getExceptionsAndFailedTasks());
            return failedTasks.containsAll(expectedExceptionAndTasks) && failedTasks.size() == expectedExceptionAndTasks.size();
        }, (long)30000L, (String)"Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyFailedTasks(Class<? extends RuntimeException> clazz, Task ... tasks) throws Exception {
        List<Task> expectedFailedTasks = Arrays.asList(tasks);
        HashSet failedTasks = new HashSet();
        TestUtils.waitForCondition(() -> {
            for (StateUpdater.ExceptionAndTasks exceptionsAndTasks : this.stateUpdater.getExceptionsAndFailedTasks()) {
                if (!clazz.isInstance(exceptionsAndTasks.exception())) continue;
                failedTasks.addAll(exceptionsAndTasks.getTasks());
            }
            return failedTasks.containsAll(expectedFailedTasks) && failedTasks.size() == expectedFailedTasks.size();
        }, (long)30000L, (String)"Did not get all exceptions and failed tasks within the given timeout!");
    }

    private void verifyDrainingExceptionsAndFailedTasks(StateUpdater.ExceptionAndTasks ... exceptionsAndTasks) throws Exception {
        List<StateUpdater.ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
        ArrayList failedTasks = new ArrayList();
        TestUtils.waitForCondition(() -> {
            if (this.stateUpdater.hasExceptionsAndFailedTasks()) {
                List exceptionAndTasks = this.stateUpdater.drainExceptionsAndFailedTasks();
                Assertions.assertFalse((boolean)exceptionAndTasks.isEmpty());
                failedTasks.addAll(exceptionAndTasks);
            }
            return failedTasks.containsAll(expectedExceptionAndTasks) && failedTasks.size() == expectedExceptionAndTasks.size();
        }, (long)30000L, (String)"Did not get all exceptions and failed tasks within the given timeout!");
        Assertions.assertFalse((boolean)this.stateUpdater.hasExceptionsAndFailedTasks());
        Assertions.assertTrue((boolean)this.stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
    }
}

