/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SingleGroupPartitionGrouperStub;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamPartitionAssignorTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final Set<String> allTopics = Utils.mkSet("topic1", "topic2");
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final TaskId task0 = new TaskId(0, 0);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StreamsConfig streamsConfig = new StreamsConfig(this.configProps());
    private final String userEndPoint = "localhost:8080";
    private final String applicationId = "stream-partition-assignor-test";
    private final TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);

    private Map<String, Object> configProps() {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", "stream-partition-assignor-test");
        configurationMap.put("bootstrap.servers", "localhost:8080");
        configurationMap.put("__task.manager.instance__", this.taskManager);
        return configurationMap;
    }

    private void configurePartitionAssignor(Map<String, Object> props) {
        Map<String, Object> configurationMap = this.configProps();
        configurationMap.putAll(props);
        this.partitionAssignor.configure(configurationMap);
    }

    private void mockTaskManager(Set<TaskId> prevTasks, Set<TaskId> cachedTasks, UUID processId, InternalTopologyBuilder builder) throws NoSuchFieldException, IllegalAccessException {
        EasyMock.expect((Object)this.taskManager.builder()).andReturn((Object)builder).anyTimes();
        EasyMock.expect(this.taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes();
        EasyMock.expect(this.taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
        EasyMock.expect((Object)this.taskManager.processId()).andReturn((Object)processId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskManager});
    }

    @Test
    public void shouldInterleaveTasksByGroupId() {
        TaskId taskIdA0 = new TaskId(0, 0);
        TaskId taskIdA1 = new TaskId(0, 1);
        TaskId taskIdA2 = new TaskId(0, 2);
        TaskId taskIdA3 = new TaskId(0, 3);
        TaskId taskIdB0 = new TaskId(1, 0);
        TaskId taskIdB1 = new TaskId(1, 1);
        TaskId taskIdB2 = new TaskId(1, 2);
        TaskId taskIdC0 = new TaskId(2, 0);
        TaskId taskIdC1 = new TaskId(2, 1);
        List<TaskId> expectedSubList1 = Arrays.asList(taskIdA0, taskIdA3, taskIdB2);
        List<TaskId> expectedSubList2 = Arrays.asList(taskIdA1, taskIdB0, taskIdC0);
        List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
        List<List> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3);
        List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
        Collections.shuffle(tasks);
        List<List<TaskId>> interleavedTaskIds = this.partitionAssignor.interleaveTasksByGroupId(tasks, 3);
        Assert.assertThat(interleavedTaskIds, (Matcher)CoreMatchers.equalTo(embeddedList));
    }

    @Test
    public void testSubscription() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
        Set<TaskId> prevTasks = Utils.mkSet(new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
        Set<TaskId> cachedTasks = Utils.mkSet(new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
        UUID processId = UUID.randomUUID();
        this.mockTaskManager(prevTasks, cachedTasks, processId, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
        HashSet<TaskId> standbyTasks = new HashSet<TaskId>(cachedTasks);
        standbyTasks.removeAll(prevTasks);
        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
        Assert.assertEquals((Object)info.encode(), (Object)subscription.userData());
    }

    @Test
    public void testAssignBasic() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
        List<String> topics = Utils.mkList("topic1", "topic2");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        Set<TaskId> prevTasks10 = Utils.mkSet(this.task0);
        Set<TaskId> prevTasks11 = Utils.mkSet(this.task1);
        Set<TaskId> prevTasks20 = Utils.mkSet(this.task2);
        Set<TaskId> standbyTasks10 = Utils.mkSet(this.task1);
        Set<TaskId> standbyTasks11 = Utils.mkSet(this.task2);
        Set<TaskId> standbyTasks20 = Utils.mkSet(this.task0);
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockTaskManager(prevTasks10, standbyTasks10, uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals(Utils.mkSet(Utils.mkSet(this.t1p0, this.t2p0), Utils.mkSet(this.t1p1, this.t2p1)), Utils.mkSet(new HashSet<TopicPartition>(assignments.get("consumer10").partitions()), new HashSet<TopicPartition>(assignments.get("consumer11").partitions())));
        Assert.assertEquals(Utils.mkSet(this.t1p2, this.t2p2), new HashSet<TopicPartition>(assignments.get("consumer20").partitions()));
        HashSet<TaskId> allActiveTasks = new HashSet<TaskId>();
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        Assert.assertEquals(Utils.mkSet(this.task0, this.task1), allActiveTasks);
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, allActiveTasks);
    }

    @Test
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
        this.builder.addProcessor("processorII", new MockProcessorSupplier(), "source2");
        List<PartitionInfo> localInfos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]));
        Cluster localMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), localInfos, Collections.emptySet(), Collections.emptySet());
        List<String> topics = Utils.mkList("topic1", "topic2");
        TaskId taskIdA0 = new TaskId(0, 0);
        TaskId taskIdA1 = new TaskId(0, 1);
        TaskId taskIdA2 = new TaskId(0, 2);
        TaskId taskIdA3 = new TaskId(0, 3);
        TaskId taskIdB0 = new TaskId(1, 0);
        TaskId taskIdB1 = new TaskId(1, 1);
        TaskId taskIdB2 = new TaskId(1, 2);
        TaskId taskIdB3 = new TaskId(1, 3);
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(localMetadata, subscriptions);
        Assert.assertEquals(Utils.mkSet(Utils.mkSet(this.t2p2, this.t1p0, this.t1p2, this.t2p0), Utils.mkSet(this.t1p1, this.t2p1, this.t1p3, this.t2p3)), Utils.mkSet(new HashSet<TopicPartition>(assignments.get("consumer10").partitions()), new HashSet<TopicPartition>(assignments.get("consumer11").partitions())));
        AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
        List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
        Assert.assertEquals(expectedInfo10TaskIds, info10.activeTasks);
        AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
        List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
        Assert.assertEquals(expectedInfo11TaskIds, info11.activeTasks);
    }

    @Test
    public void testAssignWithPartialTopology() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
        this.builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
        this.builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2");
        List<String> topics = Utils.mkList("topic1", "topic2");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.singletonMap("partition.grouper", SingleGroupPartitionGrouperStub.class));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet<TaskId> allActiveTasks = new HashSet<TaskId>();
        AssignmentInfo info10 = this.checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, new HashSet(allActiveTasks));
    }

    @Test
    public void testAssignEmptyMetadata() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
        List<String> topics = Utils.mkList("topic1", "topic2");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        Set<TaskId> prevTasks10 = Utils.mkSet(this.task0);
        Set<TaskId> standbyTasks10 = Utils.mkSet(this.task1);
        Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(prevTasks10, standbyTasks10, uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(emptyMetadata, subscriptions);
        Assert.assertEquals(Collections.emptySet(), new HashSet<TopicPartition>(assignments.get("consumer10").partitions()));
        HashSet<TaskId> allActiveTasks = new HashSet<TaskId>();
        AssignmentInfo info10 = this.checkAssignment(Collections.emptySet(), assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)0L, (long)allActiveTasks.size());
        Assert.assertEquals(Collections.emptySet(), new HashSet(allActiveTasks));
        assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals(Utils.mkSet(Utils.mkSet(this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2)), Utils.mkSet(new HashSet<TopicPartition>(assignments.get("consumer10").partitions())));
        info10 = this.checkAssignment(this.allTopics, assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithNewTasks() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addSource(null, "source3", null, null, null, "topic3");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2, this.task3);
        Set<TaskId> prevTasks10 = Utils.mkSet(this.task0);
        Set<TaskId> prevTasks11 = Utils.mkSet(this.task1);
        Set<TaskId> prevTasks20 = Utils.mkSet(this.task2);
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet<TaskId> allActiveTasks = new HashSet<TaskId>();
        HashSet<TopicPartition> allPartitions = new HashSet<TopicPartition>();
        AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(assignments.get("consumer10").partitions());
        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(assignments.get("consumer11").partitions());
        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(assignments.get("consumer20").partitions());
        Assert.assertEquals(allTasks, allActiveTasks);
        Assert.assertEquals(Utils.mkSet(this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3), allPartitions);
    }

    @Test
    public void testAssignWithStates() throws Exception {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
        this.builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
        this.builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
        this.builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
        List<String> topics = Utils.mkList("topic1", "topic2");
        TaskId task00 = new TaskId(0, 0);
        TaskId task01 = new TaskId(0, 1);
        TaskId task02 = new TaskId(0, 2);
        TaskId task10 = new TaskId(1, 0);
        TaskId task11 = new TaskId(1, 1);
        TaskId task12 = new TaskId(1, 2);
        List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)assignments.get("consumer10").partitions().size());
        Assert.assertEquals((long)2L, (long)assignments.get("consumer11").partitions().size());
        Assert.assertEquals((long)2L, (long)assignments.get("consumer20").partitions().size());
        AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
        AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
        AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
        Assert.assertEquals((long)2L, (long)info10.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info11.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info20.activeTasks.size());
        HashSet<TaskId> allTasks = new HashSet<TaskId>();
        allTasks.addAll(info10.activeTasks);
        allTasks.addAll(info11.activeTasks);
        allTasks.addAll(info20.activeTasks);
        Assert.assertEquals(new HashSet<TaskId>(tasks), allTasks);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        Assert.assertEquals(Utils.mkSet(task00, task01, task02), this.tasksForState("stream-partition-assignor-test", "store1", tasks, topicGroups));
        Assert.assertEquals(Utils.mkSet(task10, task11, task12), this.tasksForState("stream-partition-assignor-test", "store2", tasks, topicGroups));
        Assert.assertEquals(Utils.mkSet(task10, task11, task12), this.tasksForState("stream-partition-assignor-test", "store3", tasks, topicGroups));
    }

    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
        String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
        HashSet<TaskId> ids = new HashSet<TaskId>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
            if (!stateChangelogTopics.contains(changelogTopic)) continue;
            for (TaskId id : tasks) {
                if (id.topicGroupId != entry.getKey()) continue;
                ids.add(id);
            }
        }
        return ids;
    }

    @Test
    public void testAssignWithStandbyReplicas() throws Exception {
        Map<String, Object> props = this.configProps();
        props.put("num.standby.replicas", "1");
        StreamsConfig streamsConfig = new StreamsConfig(props);
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
        List<String> topics = Utils.mkList("topic1", "topic2");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        Set<TaskId> prevTasks00 = Utils.mkSet(this.task0);
        Set<TaskId> prevTasks01 = Utils.mkSet(this.task1);
        Set<TaskId> prevTasks02 = Utils.mkSet(this.task2);
        Set<TaskId> standbyTasks01 = Utils.mkSet(this.task1);
        Set<TaskId> standbyTasks02 = Utils.mkSet(this.task2);
        Set<TaskId> standbyTasks00 = Utils.mkSet(this.task0);
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockTaskManager(prevTasks00, standbyTasks01, uuid1, this.builder);
        this.configurePartitionAssignor(Collections.singletonMap("num.standby.replicas", 1));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet<TaskId> allActiveTasks = new HashSet<TaskId>();
        HashSet<TaskId> allStandbyTasks = new HashSet<TaskId>();
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        allStandbyTasks.addAll(info10.standbyTasks.keySet());
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        allStandbyTasks.addAll(info11.standbyTasks.keySet());
        Assert.assertNotEquals((String)"same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
        Assert.assertEquals(Utils.mkSet(this.task0, this.task1), new HashSet(allActiveTasks));
        Assert.assertEquals(Utils.mkSet(this.task2), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        allStandbyTasks.addAll(info20.standbyTasks.keySet());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals(allTasks, allActiveTasks);
        Assert.assertEquals((long)3L, (long)allStandbyTasks.size());
        Assert.assertEquals(allTasks, allStandbyTasks);
    }

    @Test
    public void testOnAssignment() throws Exception {
        this.configurePartitionAssignor(Collections.emptyMap());
        List<TaskId> activeTaskList = Utils.mkList(this.task0, this.task3);
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        HashMap<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(this.t3p0, this.t3p3));
        activeTasks.put(this.task0, Utils.mkSet(this.t3p0));
        activeTasks.put(this.task3, Utils.mkSet(this.t3p3));
        standbyTasks.put(this.task1, Utils.mkSet(this.t3p1));
        standbyTasks.put(this.task2, Utils.mkSet(this.t3p2));
        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState);
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(this.t3p0, this.t3p3), info.encode());
        Capture capturedCluster = EasyMock.newCapture();
        this.taskManager.setPartitionsByHostState(hostState);
        EasyMock.expectLastCall();
        this.taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
        EasyMock.expectLastCall();
        this.taskManager.setClusterMetadata((Cluster)EasyMock.capture((Capture)capturedCluster));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.taskManager});
        this.partitionAssignor.onAssignment(assignment);
        EasyMock.verify((Object[])new Object[]{this.taskManager});
        Assert.assertEquals(Collections.singleton(this.t3p0.topic()), ((Cluster)capturedCluster.getValue()).topics());
        Assert.assertEquals((long)2L, (long)((Cluster)capturedCluster.getValue()).partitionsForTopic(this.t3p0.topic()).size());
    }

    @Test
    public void testAssignWithInternalTopics() throws Exception {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addInternalTopic("topicX");
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
        this.builder.addSink("sink1", "topicX", null, null, null, "processor1");
        this.builder.addSource(null, "source2", null, null, null, "topicX");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
        List<String> topics = Utils.mkList("topic1", "stream-partition-assignor-test-topicX");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)1L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("stream-partition-assignor-test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addInternalTopic("topicX");
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
        this.builder.addSink("sink1", "topicX", null, null, null, "processor1");
        this.builder.addSource(null, "source2", null, null, null, "topicX");
        this.builder.addInternalTopic("topicZ");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
        this.builder.addSink("sink2", "topicZ", null, null, null, "processor2");
        this.builder.addSource(null, "source3", null, null, null, "topicZ");
        List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
        Set<TaskId> allTasks = Utils.mkSet(this.task0, this.task1, this.task2);
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldGenerateTasksForAllCreatedPartitions() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        KStream<Object, Object> stream1 = builder.stream("topic1").map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>(){

            @Override
            public KeyValue<Object, Object> apply(Object key, Object value) {
                return new KeyValue<Object, Object>(key, value);
            }
        });
        KTable<Object, Long> table1 = builder.table("topic3").groupBy(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>(){

            @Override
            public KeyValue<Object, Object> apply(Object key, Object value) {
                return new KeyValue<Object, Object>(key, value);
            }
        }).count();
        stream1.join(table1, new ValueJoiner(){

            public Object apply(Object value1, Object value2) {
                return null;
            }
        });
        UUID uuid = UUID.randomUUID();
        String client = "client1";
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), internalTopologyBuilder);
        this.configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("client1", new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic3"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignment = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", 4);
        Assert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("topic3", 0), new TopicPartition("topic3", 1), new TopicPartition("topic3", 2), new TopicPartition("topic3", 3), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 0), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 1), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 2), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 3));
        Assert.assertThat(new HashSet<TopicPartition>(assignment.get("client1").partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource(null, "source", null, null, null, "input");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addSink("sink", "output", null, null, null, "processor");
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:8080"));
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet("input"));
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
        Assert.assertEquals((Object)"localhost:8080", (Object)subscriptionInfo.userEndPoint);
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.builder.addSource(null, "source", null, null, null, "topic1");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addSink("sink", "output", null, null, null, "processor");
        List<String> topics = Utils.mkList("topic1");
        UUID uuid1 = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, this.builder);
        this.configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:8080"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
        Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
        Assert.assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)), topicPartitions);
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
        this.builder.setApplicationId("stream-partition-assignor-test");
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), this.builder);
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        try {
            this.configurePartitionAssignor(Collections.singletonMap("application.server", "localhost"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
        this.builder.setApplicationId("stream-partition-assignor-test");
        try {
            this.configurePartitionAssignor(Collections.singletonMap("application.server", "localhost:j87yhk"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        KStream<Object, Object> stream1 = builder.stream("topic1").selectKey(new KeyValueMapper<Object, Object, Object>(){

            @Override
            public Object apply(Object key, Object value) {
                return null;
            }
        }).groupByKey().count(Materialized.as("count")).toStream().map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>(){

            @Override
            public KeyValue<Object, Object> apply(Object key, Long value) {
                return null;
            }
        });
        builder.stream("unknownTopic").selectKey(new KeyValueMapper<Object, Object, Object>(){

            @Override
            public Object apply(Object key, Object value) {
                return null;
            }
        }).join(stream1, new ValueJoiner(){

            public Object apply(Object value1, Object value2) {
                return null;
            }
        }, JoinWindows.of(0L));
        UUID uuid = UUID.randomUUID();
        String client = "client1";
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), internalTopologyBuilder);
        this.configurePartitionAssignor(Collections.emptyMap());
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("client1", new PartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map<String, PartitionAssignor.Assignment> assignment = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-count-repartition", 3);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-count-changelog", 3);
        Assert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("stream-partition-assignor-test-count-repartition", 0), new TopicPartition("stream-partition-assignor-test-count-repartition", 1), new TopicPartition("stream-partition-assignor-test-count-repartition", 2));
        Assert.assertThat(new HashSet<TopicPartition>(assignment.get("client1").partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @Test
    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() throws Exception {
        TopicPartition partitionOne = new TopicPartition("topic", 1);
        TopicPartition partitionTwo = new TopicPartition("topic", 2);
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo));
        this.configurePartitionAssignor(Collections.emptyMap());
        this.taskManager.setPartitionsByHostState(hostState);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.taskManager});
        this.partitionAssignor.onAssignment(this.createAssignment(hostState));
        EasyMock.verify((Object[])new Object[]{this.taskManager});
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("stream-partition-assignor-test");
        builder.stream("topic1").groupByKey().count();
        UUID uuid = UUID.randomUUID();
        this.mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid, internalTopologyBuilder);
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("num.standby.replicas", 1);
        props.put("application.server", "localhost:8080");
        this.configurePartitionAssignor(props);
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:8080").encode()));
        subscriptions.put("consumer2", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()));
        Set<TopicPartition> allPartitions = Utils.mkSet(this.t1p0, this.t1p1, this.t1p2);
        Map<String, PartitionAssignor.Assignment> assign = this.partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
        Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
        Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
        HashSet<TopicPartition> allAssignedPartitions = new HashSet<TopicPartition>(consumer1partitions);
        allAssignedPartitions.addAll(consumer2Partitions);
        Assert.assertThat(consumer1partitions, (Matcher)CoreMatchers.not(allPartitions));
        Assert.assertThat(consumer2Partitions, (Matcher)CoreMatchers.not(allPartitions));
        Assert.assertThat(allAssignedPartitions, (Matcher)CoreMatchers.equalTo(allPartitions));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
        this.partitionAssignor.configure(Collections.singletonMap("num.standby.replicas", 1));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("num.standby.replicas", 1);
        config.put("__task.manager.instance__", "i am not a stream thread");
        this.partitionAssignor.configure(config);
    }

    private PartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> firstHostState) {
        AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), firstHostState);
        return new PartitionAssignor.Assignment(Collections.emptyList(), info.encode());
    }

    private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        Assert.assertEquals((long)assignment.partitions().size(), (long)info.activeTasks.size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assert.assertEquals(activeTasks, info.activeTasks);
        Assert.assertEquals(expectedTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
            TaskId id = entry.getKey();
            Set<TopicPartition> partitions = entry.getValue();
            for (TopicPartition partition : partitions) {
                Assert.assertEquals((long)id.partition, (long)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (info.standbyTasks.size() > 0) {
            Assert.assertEquals(expectedTopics, standbyTopics);
        }
        return info;
    }
}

