/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedStandbyTasks;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class TaskManagerTest {
    private final TaskId taskId0 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("t1", 0);
    private final Set<TopicPartition> taskId0Partitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0});
    private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(this.taskId0, this.taskId0Partitions);
    @Mock(type=MockType.STRICT)
    private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates;
    @Mock(type=MockType.STRICT)
    private InternalTopologyBuilder topologyBuilder;
    @Mock(type=MockType.STRICT)
    private StateDirectory stateDirectory;
    @Mock(type=MockType.NICE)
    private ChangelogReader changeLogReader;
    @Mock(type=MockType.NICE)
    private StreamsMetadataState streamsMetadataState;
    @Mock(type=MockType.NICE)
    private Consumer<byte[], byte[]> restoreConsumer;
    @Mock(type=MockType.NICE)
    private Consumer<byte[], byte[]> consumer;
    @Mock(type=MockType.NICE)
    private StreamThread.AbstractTaskCreator<StreamTask> activeTaskCreator;
    @Mock(type=MockType.NICE)
    private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
    @Mock(type=MockType.NICE)
    private AdminClient adminClient;
    @Mock(type=MockType.NICE)
    private StreamTask streamTask;
    @Mock(type=MockType.NICE)
    private StandbyTask standbyTask;
    @Mock(type=MockType.NICE)
    private AssignedStreamsTasks active;
    @Mock(type=MockType.NICE)
    private AssignedStandbyTasks standby;
    private TaskManager taskManager;
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TaskId task01 = new TaskId(0, 1);
    private final TaskId task02 = new TaskId(0, 2);
    private final TaskId task03 = new TaskId(0, 3);
    private final TaskId task11 = new TaskId(1, 1);
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        this.taskManager = new TaskManager(this.changeLogReader, UUID.randomUUID(), "", this.restoreConsumer, this.streamsMetadataState, this.activeTaskCreator, this.standbyTaskCreator, this.adminClient, this.active, this.standby);
        this.taskManager.setConsumer(this.consumer);
    }

    private void replay() {
        EasyMock.replay((Object[])new Object[]{this.changeLogReader, this.restoreConsumer, this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.active, this.standby, this.adminClient});
    }

    @Test
    public void shouldUpdateSubscriptionFromAssignment() {
        this.mockTopologyBuilder();
        EasyMock.expect((Object)this.subscriptionUpdates.getUpdates()).andReturn((Object)Utils.mkSet((Object[])new String[]{"topic1"}));
        this.topologyBuilder.updateSubscribedTopics((Set)EasyMock.eq((Object)Utils.mkSet((Object[])new String[]{"topic1", "topic2"})), EasyMock.anyString());
        EasyMock.expectLastCall().once();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromAssignment(Arrays.asList(this.t1p1, this.t2p1));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldNotUpdateSubscriptionFromAssignment() {
        this.mockTopologyBuilder();
        EasyMock.expect((Object)this.subscriptionUpdates.getUpdates()).andReturn((Object)Utils.mkSet((Object[])new String[]{"topic1", "topic2"}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromAssignment(Arrays.asList(this.t1p1));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldUpdateSubscriptionFromMetadata() {
        this.mockTopologyBuilder();
        EasyMock.expect((Object)this.subscriptionUpdates.getUpdates()).andReturn((Object)Utils.mkSet((Object[])new String[]{"topic1"}));
        this.topologyBuilder.updateSubscribedTopics((Set)EasyMock.eq((Object)Utils.mkSet((Object[])new String[]{"topic1", "topic2"})), EasyMock.anyString());
        EasyMock.expectLastCall().once();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromMetadata(Utils.mkSet((Object[])new String[]{"topic1", "topic2"}));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldNotUpdateSubscriptionFromMetadata() {
        this.mockTopologyBuilder();
        EasyMock.expect((Object)this.subscriptionUpdates.getUpdates()).andReturn((Object)Utils.mkSet((Object[])new String[]{"topic1"}));
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromMetadata(Utils.mkSet((Object[])new String[]{"topic1"}));
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldReturnCachedTaskIdsFromDirectory() throws IOException {
        File[] taskFolders = Arrays.asList(this.testFolder.newFolder("0_1"), this.testFolder.newFolder("0_2"), this.testFolder.newFolder("0_3"), this.testFolder.newFolder("1_1"), this.testFolder.newFolder("dummy")).toArray(new File[0]);
        Assert.assertTrue((boolean)new File(taskFolders[0], ".checkpoint").createNewFile());
        Assert.assertTrue((boolean)new File(taskFolders[1], ".checkpoint").createNewFile());
        Assert.assertTrue((boolean)new File(taskFolders[3], ".checkpoint").createNewFile());
        EasyMock.expect((Object)this.activeTaskCreator.stateDirectory()).andReturn((Object)this.stateDirectory).once();
        EasyMock.expect((Object)this.stateDirectory.listTaskDirectories()).andReturn((Object)taskFolders).once();
        EasyMock.replay((Object[])new Object[]{this.activeTaskCreator, this.stateDirectory});
        Set tasks = this.taskManager.cachedTasksIds();
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.stateDirectory});
        MatcherAssert.assertThat((Object)tasks, (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{this.task01, this.task02, this.task11})));
    }

    @Test
    public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        this.mockSingleActiveTask();
        this.active.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        this.mockSingleActiveTask();
        this.standby.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldAddNonResumedActiveTasks() {
        this.mockSingleActiveTask();
        EasyMock.expect((Object)this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)false);
        this.active.addNewTask((Task)EasyMock.same((Object)this.streamTask));
        this.replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.activeTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedActiveTasks() {
        EasyMock.checkOrder((Object)this.active, (boolean)true);
        EasyMock.expect((Object)this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)true);
        this.replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.active, this.activeTaskCreator});
    }

    @Test
    public void shouldAddNonResumedStandbyTasks() {
        this.mockStandbyTaskExpectations();
        EasyMock.expect((Object)this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)false);
        this.standby.addNewTask((Task)EasyMock.same((Object)this.standbyTask));
        this.replay();
        this.taskManager.setAssignmentMetadata(Collections.emptyMap(), this.taskId0Assignment);
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.standbyTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedStandbyTasks() {
        EasyMock.checkOrder((Object)this.active, (boolean)true);
        EasyMock.expect((Object)this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions)).andReturn((Object)true);
        this.replay();
        this.taskManager.setAssignmentMetadata(Collections.emptyMap(), this.taskId0Assignment);
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.standby, this.standbyTaskCreator});
    }

    @Test
    public void shouldPauseActivePartitions() {
        this.mockSingleActiveTask();
        this.consumer.pause(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldSuspendActiveTasks() {
        EasyMock.expect((Object)this.active.suspend()).andReturn(null);
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldSuspendStandbyTasks() {
        EasyMock.expect((Object)this.standby.suspend()).andReturn(null);
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnSuspend() {
        this.restoreConsumer.unsubscribe();
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
        EasyMock.expect((Object)this.active.suspend()).andReturn((Object)new RuntimeException(""));
        EasyMock.expect((Object)this.standby.suspend()).andReturn((Object)new RuntimeException(""));
        EasyMock.expectLastCall();
        this.restoreConsumer.unsubscribe();
        this.replay();
        try {
            this.taskManager.suspendTasksAndState();
            Assert.fail((String)"Should have thrown streams exception");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveTasksOnShutdown() {
        this.active.close(true);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(true);
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        this.standby.close(false);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(false);
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnShutdown() {
        this.restoreConsumer.unsubscribe();
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.shutdown(true);
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        this.active.updateRestored((Collection)EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        this.active.updateRestored((Collection)EasyMock.anyObject());
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldRestoreStateFromChangeLogReader() {
        EasyMock.expect((Object)this.changeLogReader.restore((RestoringTasks)this.active)).andReturn(this.taskId0Partitions);
        this.active.updateRestored(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.changeLogReader, this.active});
    }

    @Test
    public void shouldResumeRestoredPartitions() {
        EasyMock.expect((Object)this.changeLogReader.restore((RestoringTasks)this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)true);
        EasyMock.expect((Object)this.consumer.assignment()).andReturn(this.taskId0Partitions);
        EasyMock.expect((Object)this.standby.running()).andReturn(Collections.emptySet());
        this.consumer.resume(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.consumer});
    }

    @Test
    public void shouldAssignStandbyPartitionsWhenAllActiveTasksAreRunning() {
        this.mockAssignStandbyPartitions(1L);
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() {
        this.mockAssignStandbyPartitions(1L);
        EasyMock.expect((Object)this.standby.allTasksRunning()).andReturn((Object)true);
        this.replay();
        Assert.assertTrue((boolean)this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() {
        this.mockAssignStandbyPartitions(1L);
        EasyMock.expect((Object)this.standby.allTasksRunning()).andReturn((Object)false);
        this.replay();
        Assert.assertFalse((boolean)this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)false);
        this.replay();
        Assert.assertFalse((boolean)this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
        this.mockAssignStandbyPartitions(1L);
        this.restoreConsumer.seek(this.t1p0, 1L);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldSeekToBeginningIfOffsetIsLessThan0() {
        this.mockAssignStandbyPartitions(-1L);
        this.restoreConsumer.seekToBeginning(this.taskId0Partitions);
        EasyMock.expectLastCall();
        this.replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        EasyMock.expect((Object)this.active.commit()).andReturn((Object)1);
        EasyMock.expect((Object)this.standby.commit()).andReturn((Object)2);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.commitAll(), (Matcher)IsEqual.equalTo((Object)3));
        EasyMock.verify((Object[])new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        EasyMock.checkOrder((Object)this.standby, (boolean)true);
        this.active.commit();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException(""));
        this.replay();
        try {
            this.taskManager.commitAll();
            Assert.fail((String)"should have thrown first exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        EasyMock.expect((Object)this.standby.commit()).andThrow((Throwable)new RuntimeException(""));
        this.replay();
        try {
            this.taskManager.commitAll();
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{this.standby});
    }

    @Test
    public void shouldSendPurgeData() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords));
        futureDeletedRecords.complete(null);
        EasyMock.expect((Object)this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).times(2);
        EasyMock.expect((Object)this.adminClient.deleteRecords(recordsToDelete)).andReturn((Object)deleteRecordsResult).times(2);
        this.replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify((Object[])new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords));
        EasyMock.expect((Object)this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).once();
        EasyMock.expect((Object)this.adminClient.deleteRecords(recordsToDelete)).andReturn((Object)deleteRecordsResult).once();
        this.replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify((Object[])new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl();
        Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset((long)5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, futureDeletedRecords));
        futureDeletedRecords.completeExceptionally((Throwable)new Exception("KABOOM!"));
        EasyMock.expect((Object)this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).times(2);
        EasyMock.expect((Object)this.adminClient.deleteRecords(recordsToDelete)).andReturn((Object)deleteRecordsResult).times(2);
        this.replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify((Object[])new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldMaybeCommitActiveTasks() {
        EasyMock.expect((Object)this.active.maybeCommitPerUserRequested()).andReturn((Object)5);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.maybeCommitActiveTasksPerUserRequested(), (Matcher)IsEqual.equalTo((Object)5));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldProcessActiveTasks() {
        EasyMock.expect((Object)this.active.process(0L)).andReturn((Object)10);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.process(0L), (Matcher)IsEqual.equalTo((Object)10));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        EasyMock.expect((Object)this.active.punctuate()).andReturn((Object)20);
        this.replay();
        MatcherAssert.assertThat((Object)this.taskManager.punctuate(), (Matcher)IsEqual.equalTo((Object)20));
        EasyMock.verify((Object[])new Object[]{this.active});
    }

    @Test
    public void shouldNotResumeConsumptionUntilAllStoresRestored() {
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)false);
        Consumer consumer = (Consumer)EasyMock.createStrictMock(Consumer.class);
        this.taskManager.setConsumer(consumer);
        EasyMock.replay((Object[])new Object[]{this.active, consumer});
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify((Object[])new Object[]{consumer});
    }

    @Test
    public void shouldUpdateTasksFromPartitionAssignment() {
        HashMap<TaskId, HashSet<TopicPartition>> activeTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        HashMap<TaskId, HashSet<TopicPartition>> standbyTasks = new HashMap<TaskId, HashSet<TopicPartition>>();
        this.taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
        Assert.assertTrue((boolean)this.taskManager.assignedActiveTasks().isEmpty());
        activeTasks.put(this.task01, new HashSet<TopicPartition>(Arrays.asList(this.t1p1, this.t2p1)));
        activeTasks.put(this.task02, new HashSet<TopicPartition>(Arrays.asList(this.t1p2, this.t2p2)));
        standbyTasks.put(this.task03, new HashSet<TopicPartition>(Arrays.asList(this.t1p3, this.t2p3)));
        this.taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
        MatcherAssert.assertThat((Object)this.taskManager.assignedActiveTasks(), (Matcher)IsEqual.equalTo(activeTasks));
        MatcherAssert.assertThat((Object)this.taskManager.assignedStandbyTasks(), (Matcher)IsEqual.equalTo(standbyTasks));
    }

    private void mockAssignStandbyPartitions(long offset) {
        StandbyTask task = (StandbyTask)EasyMock.createNiceMock(StandbyTask.class);
        EasyMock.expect((Object)this.active.allTasksRunning()).andReturn((Object)true);
        EasyMock.expect((Object)this.standby.running()).andReturn(Collections.singletonList(task));
        EasyMock.expect((Object)task.checkpointedOffsets()).andReturn(Collections.singletonMap(this.t1p0, offset));
        this.restoreConsumer.assign(this.taskId0Partitions);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{task});
    }

    private void mockStandbyTaskExpectations() {
        EasyMock.expect((Object)this.standbyTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.standbyTask));
    }

    private void mockSingleActiveTask() {
        EasyMock.expect((Object)this.activeTaskCreator.createTasks((Consumer)EasyMock.anyObject(), (Map)EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.streamTask));
    }

    private void mockTopologyBuilder() {
        EasyMock.expect((Object)this.activeTaskCreator.builder()).andReturn((Object)this.topologyBuilder).anyTimes();
        EasyMock.expect((Object)this.topologyBuilder.sourceTopicPattern()).andReturn((Object)Pattern.compile("abc"));
        EasyMock.expect((Object)this.topologyBuilder.subscriptionUpdates()).andReturn((Object)this.subscriptionUpdates);
    }
}

