/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
@Timeout(value=30000L)
public class StreamsPartitionAssignorTest {
    private static final String CONSUMER_1 = "consumer1";
    private static final String CONSUMER_2 = "consumer2";
    private static final String CONSUMER_3 = "consumer3";
    private static final String CONSUMER_4 = "consumer4";
    private final Set<String> allTopics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0), new PartitionInfo("topic1", 1, AssignmentTestUtils.NODE_1, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_1), new PartitionInfo("topic1", 2, AssignmentTestUtils.NODE_2, AssignmentTestUtils.REPLICA_2, AssignmentTestUtils.REPLICA_2), new PartitionInfo("topic2", 0, AssignmentTestUtils.NODE_3, AssignmentTestUtils.REPLICA_3, AssignmentTestUtils.REPLICA_3), new PartitionInfo("topic2", 1, AssignmentTestUtils.NODE_4, AssignmentTestUtils.REPLICA_4, AssignmentTestUtils.REPLICA_4), new PartitionInfo("topic2", 2, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0), new PartitionInfo("topic3", 0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_1), new PartitionInfo("topic3", 1, AssignmentTestUtils.NODE_2, AssignmentTestUtils.REPLICA_2, AssignmentTestUtils.REPLICA_2), new PartitionInfo("topic3", 2, AssignmentTestUtils.NODE_3, AssignmentTestUtils.REPLICA_3, AssignmentTestUtils.REPLICA_3), new PartitionInfo("topic3", 3, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0));
    private final SubscriptionInfo defaultSubscriptionInfo = AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
    private final Cluster metadata = new Cluster("cluster", Arrays.asList(AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2, AssignmentTestUtils.NODE_3, AssignmentTestUtils.NODE_4), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String OTHER_END_POINT = "other:9090";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Admin adminClient;
    private InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private TopologyMetadata topologyMetadata;
    @Mock
    private StreamsMetadataState streamsMetadataState;
    @Captor
    private ArgumentCaptor<Map<TopicPartition, PartitionInfo>> topicPartitionInfoCaptor;
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
    private Map<String, String> clientTags;
    private final ReferenceContainer referenceContainer = new ReferenceContainer();
    private final MockTime time = new MockTime();
    private final byte uniqueField = 1;

    private Map<String, Object> configProps(Map<String, Object> parameterizedConfig) {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        this.referenceContainer.mainConsumer = (Consumer)Mockito.mock(Consumer.class);
        this.referenceContainer.adminClient = this.adminClient != null ? this.adminClient : (Admin)Mockito.mock(Admin.class);
        this.referenceContainer.taskManager = this.taskManager;
        this.referenceContainer.streamsMetadataState = this.streamsMetadataState;
        this.referenceContainer.time = this.time;
        this.referenceContainer.clientTags = this.clientTags != null ? this.clientTags : AssignmentTestUtils.EMPTY_CLIENT_TAGS;
        configurationMap.put("__reference.container.instance__", this.referenceContainer);
        configurationMap.putAll(parameterizedConfig);
        return configurationMap;
    }

    private MockInternalTopicManager configureDefault(Map<String, Object> parameterizedConfig) {
        this.createDefaultMockTaskManager();
        return this.configureDefaultPartitionAssignor(parameterizedConfig);
    }

    private MockInternalTopicManager configureDefaultPartitionAssignor(Map<String, Object> parameterizedConfig) {
        return this.configurePartitionAssignorWith(Collections.emptyMap(), parameterizedConfig);
    }

    private MockInternalTopicManager configurePartitionAssignorWith(Map<String, Object> props, Map<String, Object> parameterizedConfig) {
        return this.configurePartitionAssignorWith(props, null, parameterizedConfig);
    }

    private MockInternalTopicManager configurePartitionAssignorWith(Map<String, Object> props, List<Map<String, List<TopicPartitionInfo>>> topicPartitionInfo, Map<String, Object> parameterizedConfig) {
        Map<String, Object> configMap = this.configProps(parameterizedConfig);
        configMap.putAll(props);
        this.partitionAssignor.configure(configMap);
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(this.configProps(parameterizedConfig)));
        return this.overwriteInternalTopicManagerWithMock(false, topicPartitionInfo, parameterizedConfig);
    }

    private void createDefaultMockTaskManager() {
        this.createMockTaskManager(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
    }

    private void createMockTaskManager(Set<TaskId> activeTasks, Set<TaskId> standbyTasks) {
        this.taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Mockito.lenient().when((Object)this.taskManager.topologyMetadata()).thenReturn((Object)this.topologyMetadata);
        Mockito.lenient().when((Object)this.taskManager.getTaskOffsetSums()).thenReturn(StreamsPartitionAssignorTest.getTaskOffsetSums(activeTasks, standbyTasks));
        Mockito.lenient().when((Object)this.taskManager.processId()).thenReturn((Object)AssignmentTestUtils.PID_1);
        this.builder.setApplicationId(APPLICATION_ID);
        this.topologyMetadata.buildAndRewriteTopology();
    }

    private MockInternalTopicManager overwriteInternalTopicManagerWithMock(boolean mockCreateInternalTopics, Map<String, Object> parameterizedConfig) {
        return this.overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null, parameterizedConfig);
    }

    private MockInternalTopicManager overwriteInternalTopicManagerWithMock(boolean mockCreateInternalTopics, List<Map<String, List<TopicPartitionInfo>>> topicPartitionInfo, Map<String, Object> parameterizedConfig) {
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager)((Object)Mockito.spy((Object)((Object)new MockInternalTopicManager((Time)this.time, new StreamsConfig(this.configProps(parameterizedConfig)), this.mockClientSupplier.restoreConsumer, mockCreateInternalTopics))));
        if (topicPartitionInfo != null) {
            Mockito.lenient().when((Object)mockInternalTopicManager.getTopicPartitionInfo(ArgumentMatchers.anySet())).thenAnswer(invocation -> {
                Set topics = (Set)invocation.getArgument(0);
                for (Map tp : topicPartitionInfo) {
                    if (!topics.equals(tp.keySet())) continue;
                    return tp;
                }
                return Collections.emptyMap();
            });
        }
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        return mockInternalTopicManager;
    }

    static Stream<Arguments> parameter() {
        return Stream.of(Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(HighAvailabilityTaskAssignor.class, null, "min_traffic")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(HighAvailabilityTaskAssignor.class, null, "none")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(LegacyStickyTaskAssignor.class, null, "min_traffic")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(LegacyStickyTaskAssignor.class, null, "none")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(FallbackPriorTaskAssignor.class, null, "min_traffic")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(FallbackPriorTaskAssignor.class, null, "none")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(null, StickyTaskAssignor.class, "none")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(null, StickyTaskAssignor.class, "min_traffic")}), Arguments.of((Object[])new Object[]{StreamsPartitionAssignorTest.buildParameterizedConfig(HighAvailabilityTaskAssignor.class, StickyTaskAssignor.class, "none")}));
    }

    private static Map<String, Object> buildParameterizedConfig(Class<? extends LegacyTaskAssignor> internalTaskAssignor, Class<? extends TaskAssignor> customTaskAssignor, String rackAwareAssignorStrategy) {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("rack.aware.assignment.strategy", rackAwareAssignorStrategy);
        if (internalTaskAssignor != null) {
            configurationMap.put("internal.task.assignor.class", internalTaskAssignor.getName());
        }
        if (customTaskAssignor != null) {
            configurationMap.put("task.assignor.class", customTaskAssignor.getName());
        }
        return configurationMap;
    }

    private void setUp(Map<String, Object> parameterizedConfig, boolean mockListOffsets) {
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS, mockListOffsets);
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(this.configProps(parameterizedConfig)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldUseEagerRebalancingProtocol(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
        Assertions.assertEquals((int)1, (int)this.partitionAssignor.supportedProtocols().size());
        Assertions.assertTrue((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Assertions.assertFalse((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldUseCooperativeRebalancingProtocol(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.configureDefault(parameterizedConfig);
        Assertions.assertEquals((int)2, (int)this.partitionAssignor.supportedProtocols().size());
        Assertions.assertTrue((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap(), false);
        state.computeTaskLags(AssignmentTestUtils.PID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        StreamsPartitionAssignorTest.assertEquivalentAssignment(previousAssignment, StreamsPartitionAssignor.assignTasksToThreads(allTasks, (boolean)true, (SortedSet)consumers, (ClientState)state, new HashMap()));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        ArrayList<TaskId> allTasks = new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3));
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3))), Utils.mkEntry((Object)CONSUMER_2, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0))), Utils.mkEntry((Object)CONSUMER_3, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2)))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap(), false);
        state.computeTaskLags(AssignmentTestUtils.PID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        TaskId newTask = AssignmentTestUtils.TASK_2_0;
        allTasks.add(newTask);
        state.assignActiveTasks(allTasks);
        Map newAssignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, (boolean)true, (SortedSet)consumers, (ClientState)state, new HashMap());
        ((List)previousAssignment.get(CONSUMER_2)).add(newTask);
        StreamsPartitionAssignorTest.assertEquivalentAssignment(previousAssignment, newAssignment);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap(), false);
        state.computeTaskLags(AssignmentTestUtils.PID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        consumers.remove(CONSUMER_3);
        Map assignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, (boolean)true, (SortedSet)consumers, (ClientState)state, new HashMap());
        Assertions.assertTrue((boolean)((List)assignment.get(CONSUMER_1)).containsAll((Collection)previousAssignment.get(CONSUMER_1)));
        Assertions.assertTrue((boolean)((List)assignment.get(CONSUMER_2)).containsAll((Collection)previousAssignment.get(CONSUMER_2)));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_1)).size(), (Matcher)CoreMatchers.equalTo((Object)4));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_2)).size(), (Matcher)CoreMatchers.equalTo((Object)4));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        consumers.add(CONSUMER_4);
        state.addPreviousTasksAndOffsetSums(CONSUMER_4, StreamsPartitionAssignorTest.getTaskOffsetSums(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap(), false);
        state.computeTaskLags(AssignmentTestUtils.PID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        Map assignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, (boolean)true, (SortedSet)consumers, (ClientState)state, new HashMap());
        Assertions.assertTrue((boolean)((List)previousAssignment.get(CONSUMER_1)).containsAll((Collection)assignment.get(CONSUMER_1)));
        Assertions.assertTrue((boolean)((List)previousAssignment.get(CONSUMER_3)).containsAll((Collection)assignment.get(CONSUMER_3)));
        Assertions.assertTrue((boolean)((List)assignment.get(CONSUMER_2)).containsAll((Collection)previousAssignment.get(CONSUMER_2)));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_1)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_2)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_3)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_4)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldInterleaveTasksByGroupIdDuringNewAssignment(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_2))), Utils.mkEntry((Object)CONSUMER_2, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_2_0))), Utils.mkEntry((Object)CONSUMER_3, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_2_1)))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, Collections.emptyMap());
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, Collections.emptyMap());
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, Collections.emptyMap());
        Collections.shuffle(allTasks);
        Map interleavedTaskIds = StreamsPartitionAssignor.assignTasksToThreads(allTasks, (boolean)true, (SortedSet)consumers, (ClientState)state, new HashMap());
        MatcherAssert.assertThat((Object)interleavedTaskIds, (Matcher)CoreMatchers.equalTo((Object)assignment));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testEagerSubscription(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set standbyTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        this.createMockTaskManager(prevTasks, standbyTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
        MatcherAssert.assertThat((Object)this.partitionAssignor.rebalanceProtocol(), (Matcher)CoreMatchers.equalTo((Object)ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Set topics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        Collections.sort(subscription.topics());
        Assertions.assertEquals(Arrays.asList("topic1", "topic2"), (Object)subscription.topics());
        SubscriptionInfo info = AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, (Set<TaskId>)prevTasks, (Set<TaskId>)standbyTasks, (byte)1);
        Assertions.assertEquals((Object)info, (Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testCooperativeSubscription(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set standbyTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        this.createMockTaskManager(prevTasks, standbyTasks);
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        Collections.sort(subscription.topics());
        Assertions.assertEquals(Arrays.asList("topic1", "topic2"), (Object)subscription.topics());
        SubscriptionInfo info = AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, (Set<TaskId>)prevTasks, (Set<TaskId>)standbyTasks, (byte)1);
        Assertions.assertEquals((Object)info, (Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignBasic(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store", false), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set<TaskId> standbyTasks10 = AssignmentTestUtils.EMPTY_TASKS;
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        this.createMockTaskManager(prevTasks10, standbyTasks10);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store-changelog"), Collections.singletonList(3)), true);
        List<Map<String, List<TopicPartitionInfo>>> partitionInfo = Collections.singletonList(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"stream-partition-assignor-test-store-changelog", Collections.singletonList(new TopicPartitionInfo(0, new Node(1, "h1", 80), Collections.singletonList(new Node(1, "h1", 80)), Collections.emptyList())))}));
        this.configurePartitionAssignorWith(Collections.emptyMap(), partitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, prevTasks10, standbyTasks10).encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, prevTasks11, standbyTasks11).encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, prevTasks20, standbyTasks20).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions()));
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        AssignmentInfo info11 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), allActiveTasks);
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks());
        Assertions.assertEquals((int)3, (int)allActiveTasks.size());
        Assertions.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assertions.assertEquals((int)3, (int)allActiveTasks.size());
        Assertions.assertEquals((Object)allTasks, allActiveTasks);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processorII", new MockApiProcessorSupplier(), new String[]{"source2"});
        List<PartitionInfo> localInfos = Arrays.asList(new PartitionInfo("topic1", 0, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0), new PartitionInfo("topic1", 1, AssignmentTestUtils.NODE_1, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_1), new PartitionInfo("topic1", 2, AssignmentTestUtils.NODE_2, AssignmentTestUtils.REPLICA_2, AssignmentTestUtils.REPLICA_2), new PartitionInfo("topic1", 3, AssignmentTestUtils.NODE_3, AssignmentTestUtils.REPLICA_3, AssignmentTestUtils.REPLICA_3), new PartitionInfo("topic2", 0, AssignmentTestUtils.NODE_4, AssignmentTestUtils.REPLICA_4, AssignmentTestUtils.REPLICA_4), new PartitionInfo("topic2", 1, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0), new PartitionInfo("topic2", 2, AssignmentTestUtils.NODE_1, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_1), new PartitionInfo("topic2", 3, AssignmentTestUtils.NODE_2, AssignmentTestUtils.REPLICA_2, AssignmentTestUtils.REPLICA_2));
        Cluster localMetadata = new Cluster("cluster", Arrays.asList(AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2, AssignmentTestUtils.NODE_3, AssignmentTestUtils.NODE_4), localInfos, Collections.emptySet(), Collections.emptySet());
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.configureDefault(parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(localMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t2p2, this.t1p0, this.t1p2, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1, this.t1p3, this.t2p3})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        List<TaskId> expectedInfo10TaskIds = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_2);
        Assertions.assertEquals(expectedInfo10TaskIds, (Object)info10.activeTasks());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        List<TaskId> expectedInfo11TaskIds = Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3);
        Assertions.assertEquals(expectedInfo11TaskIds, (Object)info11.activeTasks());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldNotAssignTemporaryStandbyTask(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        List<PartitionInfo> localInfos = Arrays.asList(new PartitionInfo("topic1", 0, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0), new PartitionInfo("topic1", 1, AssignmentTestUtils.NODE_1, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_1), new PartitionInfo("topic1", 2, AssignmentTestUtils.NODE_2, AssignmentTestUtils.REPLICA_2, AssignmentTestUtils.REPLICA_2), new PartitionInfo("topic1", 3, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_1, AssignmentTestUtils.REPLICA_2));
        Cluster localMetadata = new Cluster("cluster", Arrays.asList(AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2), localInfos, Collections.emptySet(), Collections.emptySet());
        List<String> topics = Collections.singletonList("topic1");
        this.createMockTaskManager(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}), Collections.emptySet());
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode(), Arrays.asList(this.t1p0, this.t1p2), -1, Optional.of("rack2")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3}), Collections.emptySet()).encode(), Arrays.asList(this.t1p1, this.t1p3), -1, Optional.of("rack2")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, Collections.emptySet(), Collections.emptySet()).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(localMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assertions.assertTrue((boolean)info20.activeTasks().isEmpty());
        Assertions.assertTrue((boolean)info20.standbyTasks().isEmpty());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignEmptyMetadata(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        this.createMockTaskManager(prevTasks10, standbyTasks10);
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, prevTasks10, standbyTasks10).encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        Map assignments = this.partitionAssignor.assign(emptyMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assertions.assertEquals(Collections.emptySet(), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()));
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(Collections.emptySet(), (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        Assertions.assertEquals((int)0, (int)allActiveTasks.size());
        assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions())}));
        info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks());
        Assertions.assertEquals((int)3, (int)allActiveTasks.size());
        Assertions.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assertions.assertEquals((int)3, (int)allActiveTasks.size());
        Assertions.assertEquals((Object)allTasks, allActiveTasks);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithNewTasks(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(prevTasks10, AssignmentTestUtils.EMPTY_TASKS);
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, prevTasks10, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, prevTasks11, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, prevTasks20, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        HashSet allActiveTasks = new HashSet(info.activeTasks());
        HashSet allPartitions = new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        allActiveTasks.addAll(info.activeTasks());
        allPartitions.addAll(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        allActiveTasks.addAll(info.activeTasks());
        allPartitions.addAll(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions());
        Assertions.assertEquals((Object)allTasks, allActiveTasks);
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), allPartitions);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStates(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store3", false), new String[]{"processor-2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        List<TaskId> tasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog", "stream-partition-assignor-test-store3-changelog"), Arrays.asList(3, 3, 3)), true);
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog", "stream-partition-assignor-test-store3-changelog"})));
        this.configurePartitionAssignorWith(Collections.emptyMap(), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assertions.assertEquals((int)2, (int)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions().size());
        Assertions.assertEquals((int)2, (int)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions().size());
        Assertions.assertEquals((int)2, (int)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions().size());
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assertions.assertEquals((int)2, (int)info10.activeTasks().size());
        Assertions.assertEquals((int)2, (int)info11.activeTasks().size());
        Assertions.assertEquals((int)2, (int)info20.activeTasks().size());
        HashSet allTasks = new HashSet();
        allTasks.addAll(info10.activeTasks());
        allTasks.addAll(info11.activeTasks());
        allTasks.addAll(info20.activeTasks());
        Assertions.assertEquals(new HashSet<TaskId>(tasks), allTasks);
        Map topicGroups = this.builder.subtopologyToTopicsInfo();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2}), StreamsPartitionAssignorTest.tasksForState("store1", tasks, topicGroups));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), StreamsPartitionAssignorTest.tasksForState("store2", tasks, topicGroups));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), StreamsPartitionAssignorTest.tasksForState("store3", tasks, topicGroups));
    }

    private static Set<TaskId> tasksForState(String storeName, List<TaskId> tasks, Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups) {
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)APPLICATION_ID, (String)storeName, null);
        HashSet<TaskId> ids = new HashSet<TaskId>();
        for (Map.Entry<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
            if (!stateChangelogTopics.contains(changelogTopic)) continue;
            for (TaskId id : tasks) {
                if (id.subtopology() != entry.getKey().nodeGroupId) continue;
                ids.add(id);
            }
        }
        return ids;
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicasAndStatelessTasks(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.createMockTaskManager(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        Assertions.assertTrue((boolean)info10.standbyTasks().isEmpty());
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        Assertions.assertTrue((boolean)info20.standbyTasks().isEmpty());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicasAndLoggingDisabled(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false).withLoggingDisabled(), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.createMockTaskManager(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        Assertions.assertTrue((boolean)info10.standbyTasks().isEmpty());
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        Assertions.assertTrue((boolean)info20.standbyTasks().isEmpty());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicas(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTopicPartitions = topics.stream().map(topic -> Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))).flatMap(Collection::stream).collect(Collectors.toSet());
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks00 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks01 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks02 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set standbyTasks00 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set standbyTasks01 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set standbyTasks02 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(prevTasks00, standbyTasks01);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)), true);
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog"})));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, (Set<TaskId>)prevTasks00, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, (Set<TaskId>)prevTasks01, (Set<TaskId>)standbyTasks02, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, (Set<TaskId>)prevTasks02, (Set<TaskId>)standbyTasks00, OTHER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        HashSet allStandbyTasks = new HashSet(info10.standbyTasks().keySet());
        AssignmentInfo info11 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks());
        allStandbyTasks.addAll(info11.standbyTasks().keySet());
        Assertions.assertNotEquals(info11.standbyTasks().keySet(), info10.standbyTasks().keySet(), (String)"same processId has same set of standby tasks");
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), new HashSet(allActiveTasks));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks());
        allStandbyTasks.addAll(info20.standbyTasks().keySet());
        Assertions.assertEquals((int)3, (int)allActiveTasks.size());
        Assertions.assertEquals((Object)allTasks, allActiveTasks);
        Assertions.assertEquals((int)3, (int)allStandbyTasks.size());
        Assertions.assertEquals((Object)allTasks, allStandbyTasks);
        Map partitionsByHost = info10.partitionsByHost();
        Assertions.assertEquals((int)2, (int)partitionsByHost.size());
        Assertions.assertEquals(allTopicPartitions, partitionsByHost.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()));
        Map standbyPartitionsByHost = info10.standbyPartitionByHost();
        Assertions.assertEquals((int)2, (int)standbyPartitionsByHost.size());
        Assertions.assertEquals(allTopicPartitions, standbyPartitionsByHost.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()));
        for (HostInfo hostInfo : partitionsByHost.keySet()) {
            Assertions.assertTrue((boolean)Collections.disjoint((Collection)partitionsByHost.get(hostInfo), (Collection)standbyPartitionsByHost.get(hostInfo)));
        }
        Assertions.assertEquals((Object)partitionsByHost, (Object)info11.partitionsByHost());
        Assertions.assertEquals((Object)partitionsByHost, (Object)info20.partitionsByHost());
        Assertions.assertEquals((Object)standbyPartitionsByHost, (Object)info11.standbyPartitionByHost());
        Assertions.assertEquals((Object)standbyPartitionsByHost, (Object)info20.standbyPartitionByHost());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicasBalanceSparse(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor"});
        List<String> topics = Collections.singletonList("topic1");
        this.createMockTaskManager(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)), true);
        Map changelogTopicPartitionInfo = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"stream-partition-assignor-test-store1-changelog", Arrays.asList(new TopicPartitionInfo(0, AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_0), Arrays.asList(AssignmentTestUtils.REPLICA_0)), new TopicPartitionInfo(1, AssignmentTestUtils.NODE_1, Arrays.asList(AssignmentTestUtils.REPLICA_1), Arrays.asList(AssignmentTestUtils.REPLICA_1)), new TopicPartitionInfo(2, AssignmentTestUtils.NODE_3, Arrays.asList(AssignmentTestUtils.REPLICA_3), Arrays.asList(AssignmentTestUtils.REPLICA_3))))});
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), Collections.singletonList(changelogTopicPartitionInfo), parameterizedConfig);
        List<String> client1Consumers = Arrays.asList("consumer10", "consumer11", "consumer12", "consumer13");
        List<String> client2Consumers = Arrays.asList("consumer20", "consumer21", "consumer22");
        for (String consumerId : client1Consumers) {
            this.subscriptions.put(consumerId, new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        }
        for (String consumerId : client2Consumers) {
            this.subscriptions.put(consumerId, new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        }
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info12 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer12")).userData());
        AssignmentInfo info13 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer13")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        AssignmentInfo info21 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer21")).userData());
        AssignmentInfo info22 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer22")).userData());
        Assertions.assertTrue((info10.activeTasks().size() + info10.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info11.activeTasks().size() + info11.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info12.activeTasks().size() + info12.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info13.activeTasks().size() + info13.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info20.activeTasks().size() + info20.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info21.activeTasks().size() + info21.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info22.activeTasks().size() + info22.standbyTasks().size() <= 1 ? 1 : 0) != 0);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicasBalanceDense(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor"});
        List<String> topics = Collections.singletonList("topic1");
        this.createMockTaskManager(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)), true);
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog"})));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assertions.assertEquals((int)3, (int)(info10.activeTasks().size() + info10.standbyTasks().size()));
        Assertions.assertEquals((int)3, (int)(info20.activeTasks().size() + info20.standbyTasks().size()));
        Assertions.assertTrue((info10.activeTasks().size() < 3 ? 1 : 0) != 0);
        Assertions.assertTrue((info20.activeTasks().size() < 3 ? 1 : 0) != 0);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithStandbyReplicasBalanceWithStatelessTasks(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor_with_state", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor_with_state"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.createMockTaskManager(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)), true);
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog"})));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        this.subscriptions.put("consumer21", new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        AssignmentInfo info21 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer21")).userData());
        Assertions.assertTrue((info10.activeTasks().size() + info10.standbyTasks().size() <= 3 ? 1 : 0) != 0);
        Assertions.assertTrue((info11.activeTasks().size() + info11.standbyTasks().size() <= 3 ? 1 : 0) != 0);
        Assertions.assertTrue((info20.activeTasks().size() + info20.standbyTasks().size() <= 3 ? 1 : 0) != 0);
        Assertions.assertTrue((info21.activeTasks().size() + info21.standbyTasks().size() <= 3 ? 1 : 0) != 0);
        Assertions.assertTrue((info10.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info11.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info20.standbyTasks().size() <= 1 ? 1 : 0) != 0);
        Assertions.assertTrue((info21.standbyTasks().size() <= 1 ? 1 : 0) != 0);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testOnAssignment(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Map<HostInfo, Set> hostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{this.t3p0, this.t3p3}));
        HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        activeTasks.put(AssignmentTestUtils.TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{this.t3p0}));
        activeTasks.put(AssignmentTestUtils.TASK_0_3, Utils.mkSet((Object[])new TopicPartition[]{this.t3p3}));
        HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        standbyTasks.put(AssignmentTestUtils.TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{this.t3p1}));
        standbyTasks.put(AssignmentTestUtils.TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{this.t3p2}));
        this.streamsMetadataState = (StreamsMetadataState)Mockito.mock(StreamsMetadataState.class);
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        List<TaskId> activeTaskList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3);
        AssignmentInfo info = new AssignmentInfo(11, activeTaskList, standbyTasks, hostState, Collections.emptyMap(), 0);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Arrays.asList(this.t3p0, this.t3p3), info.encode());
        this.partitionAssignor.onAssignment(assignment, null);
        ((StreamsMetadataState)Mockito.verify((Object)this.streamsMetadataState)).onChange((Map)ArgumentMatchers.eq(hostState), (Map)ArgumentMatchers.any(), (Map)this.topicPartitionInfoCaptor.capture());
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleAssignment(activeTasks, standbyTasks);
        Assertions.assertTrue((boolean)((Map)this.topicPartitionInfoCaptor.getValue()).containsKey(this.t3p0));
        Assertions.assertTrue((boolean)((Map)this.topicPartitionInfoCaptor.getValue()).containsKey(this.t3p3));
        Assertions.assertEquals((int)2, (int)((Map)this.topicPartitionInfoCaptor.getValue()).size());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithInternalTopics(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockApiProcessorSupplier(), new String[]{"source2"});
        List<String> topics = Arrays.asList("topic1", "stream-partition-assignor-test-topicX");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(4, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-topicX"})));
        MockInternalTopicManager internalTopicManager = this.configurePartitionAssignorWith(Collections.emptyMap(), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assertions.assertEquals((int)1, (int)internalTopicManager.readyTopics.size());
        Assertions.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("stream-partition-assignor-test-topicX").intValue());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ", InternalTopicProperties.empty());
        this.builder.addProcessor("processor2", new MockApiProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", null, null, null, new String[]{"processor2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topicZ"});
        List<String> topics = Arrays.asList("topic1", "stream-partition-assignor-test-topicX", "stream-partition-assignor-test-topicZ");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(4, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-topicX", "stream-partition-assignor-test-topicZ"})));
        MockInternalTopicManager internalTopicManager = this.configurePartitionAssignorWith(Collections.emptyMap(), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assertions.assertEquals((int)2, (int)internalTopicManager.readyTopics.size());
        Assertions.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("stream-partition-assignor-test-topicZ").intValue());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldGenerateTasksForAllCreatedPartitions(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream1 = streamsBuilder.stream("topic1").map(KeyValue::new);
        KTable table1 = streamsBuilder.table("topic3").groupBy(KeyValue::new).count();
        stream1.join(table1, (value1, value2) -> null);
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(this.configProps(parameterizedConfig)));
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", "stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"), Arrays.asList(4, 4)), true);
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> topicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(4, Arrays.asList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", "stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"}), Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", "stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition"})));
        MockInternalTopicManager mockInternalTopicManager = this.configurePartitionAssignorWith(Collections.emptyMap(), topicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Arrays.asList("topic1", "topic3"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 4);
        MatcherAssert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("topic3", 0), new TopicPartition("topic3", 1), new TopicPartition("topic3", 2), new TopicPartition("topic3", 3), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 0), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 1), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 2), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 3));
        MatcherAssert.assertThat(new HashSet(((ConsumerPartitionAssignor.Assignment)assignment.get("client1")).partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").repartition();
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.createDefaultMockTaskManager();
        this.partitionAssignor.configure(this.configProps(parameterizedConfig));
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)this.time, new StreamsConfig(this.configProps(parameterizedConfig)), this.mockClientSupplier.restoreConsumer, false){

            @Override
            public Set<String> makeReady(Map<String, InternalTopicConfig> topics) {
                throw new TimeoutException("KABOOM!");
            }
        };
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Assertions.assertThrows(TimeoutException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        StreamsConfig config = new StreamsConfig(this.configProps(parameterizedConfig));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Materialized.as((String)"store"));
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, config);
        this.createDefaultMockTaskManager();
        this.partitionAssignor.configure(this.configProps(parameterizedConfig));
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)this.time, config, this.mockClientSupplier.restoreConsumer, false){

            @Override
            public Set<String> makeReady(Map<String, InternalTopicConfig> topics) {
                if (topics.isEmpty()) {
                    return Collections.emptySet();
                }
                throw new TimeoutException("KABOOM!");
            }
        };
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Assertions.assertThrows(TimeoutException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldAddUserDefinedEndPointToSubscription(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source", null, null, null, new String[]{"input"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT), parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"input"});
        ByteBuffer userData = this.partitionAssignor.subscriptionUserData(topics);
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), userData);
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
        Assertions.assertEquals((Object)USER_END_POINT, (Object)subscriptionInfo.userEndPoint());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldMapUserEndPointToTopicPartitions(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        List<String> topics = Collections.singletonList("topic1");
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT), parameterizedConfig);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(topics, AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        ConsumerPartitionAssignor.Assignment consumerAssignment = (ConsumerPartitionAssignor.Assignment)assignments.get(CONSUMER_1);
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumerAssignment.userData());
        Set topicPartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Object)topicPartitions);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        try {
            this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost"), parameterizedConfig);
            Assertions.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        Assertions.assertThrows(ConfigException.class, () -> this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost:j87yhk"), parameterizedConfig));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream1 = streamsBuilder.stream("topic1").selectKey((key, value) -> null).groupByKey().count(Materialized.as((String)"count")).toStream().map((key, value) -> null);
        streamsBuilder.stream("unknownTopic").selectKey((key, value) -> null).join(stream1, (value1, value2) -> null, JoinWindows.of((Duration)Duration.ofMillis(0L)));
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        MockInternalTopicManager mockInternalTopicManager = this.configureDefault(parameterizedConfig);
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)mockInternalTopicManager.readyTopics.isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get("client1")).partitions().isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Map initialHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map newHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("newotherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        this.streamsMetadataState = (StreamsMetadataState)Mockito.mock(StreamsMetadataState.class);
        this.createDefaultMockTaskManager();
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(initialHostState), null);
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(newHostState), null);
        ((StreamsMetadataState)Mockito.verify((Object)this.streamsMetadataState)).onChange((Map)ArgumentMatchers.eq((Object)initialHostState), (Map)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        ((StreamsMetadataState)Mockito.verify((Object)this.streamsMetadataState)).onChange((Map)ArgumentMatchers.eq((Object)newHostState), (Map)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldTriggerImmediateRebalanceOnHostInfoChange(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Map oldHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map newHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("newhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "newhost:9090"), parameterizedConfig);
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(oldHostState), null);
        MatcherAssert.assertThat((Object)this.referenceContainer.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)0L));
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(newHostState), null);
        MatcherAssert.assertThat((Object)this.referenceContainer.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)Long.MAX_VALUE));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldTriggerImmediateRebalanceOnTasksRevoked(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        List<TopicPartition> allPartitions = Arrays.asList(this.t1p0, this.t1p1, this.t1p2);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, allTasks, AssignmentTestUtils.EMPTY_TASKS).encode(), allPartitions, -1, Optional.of("rack0")));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, allTasks).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.createMockTaskManager(allTasks, allTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("acceptable.recovery.lag", 0L), parameterizedConfig);
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).partitions(), (Matcher)CoreMatchers.not(allPartitions));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).activeTasks(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).standbyTasks(), (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
        this.partitionAssignor.onAssignment((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2), null);
        MatcherAssert.assertThat((Object)this.referenceContainer.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)0L));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Map<String, Object> props = this.configProps(parameterizedConfig);
        props.put("num.standby.replicas", 1);
        props.put("application.server", USER_END_POINT);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").groupByKey().count();
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(props));
        this.createDefaultMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"), Collections.singletonList(3)), true);
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"})));
        this.configurePartitionAssignorWith(props, changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, OTHER_END_POINT).encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        Set allPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        Map assign = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        ConsumerPartitionAssignor.Assignment consumer1Assignment = (ConsumerPartitionAssignor.Assignment)assign.get(CONSUMER_1);
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumer1Assignment.userData());
        Set consumer1ActivePartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
        Set consumer2ActivePartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
        Set consumer1StandbyPartitions = (Set)assignmentInfo.standbyPartitionByHost().get(new HostInfo("localhost", 8080));
        Set consumer2StandbyPartitions = (Set)assignmentInfo.standbyPartitionByHost().get(new HostInfo("other", 9090));
        HashSet allAssignedPartitions = new HashSet(consumer1ActivePartitions);
        allAssignedPartitions.addAll(consumer2ActivePartitions);
        MatcherAssert.assertThat((Object)consumer1ActivePartitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        MatcherAssert.assertThat((Object)consumer2ActivePartitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        MatcherAssert.assertThat((Object)consumer1ActivePartitions, (Matcher)CoreMatchers.equalTo((Object)consumer2StandbyPartitions));
        MatcherAssert.assertThat((Object)consumer2ActivePartitions, (Matcher)CoreMatchers.equalTo((Object)consumer1StandbyPartitions));
        MatcherAssert.assertThat(allAssignedPartitions, (Matcher)CoreMatchers.equalTo((Object)allPartitions));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowKafkaExceptionIfReferenceContainerNotConfigured(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Map<String, Object> config = this.configProps(parameterizedConfig);
        config.remove("__reference.container.instance__");
        KafkaException expected = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.partitionAssignor.configure(config));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"ReferenceContainer is not specified"));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowKafkaExceptionIfReferenceContainerConfigIsNotTaskManagerInstance(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Map<String, Object> config = this.configProps(parameterizedConfig);
        config.put("__reference.container.instance__", "i am not a reference container");
        KafkaException expected = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.partitionAssignor.configure(config));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer"));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3, parameterizedConfig);
    }

    private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(int smallestVersion, int otherVersion, Map<String, Object> paramterizedObject) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(smallestVersion, AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(otherVersion, AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.configureDefault(paramterizedObject);
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).userData()).version(), (Matcher)CoreMatchers.equalTo((Object)smallestVersion));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).version(), (Matcher)CoreMatchers.equalTo((Object)smallestVersion));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion1(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", StreamsConfig.UPGRADE_FROM_0100), parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"topic1"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        MatcherAssert.assertThat((Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()).version(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion2For0101(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion2For0102(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion2For0110(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion2For10(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldDownGradeSubscriptionToVersion2For11(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11, parameterizedConfig);
    }

    private void shouldDownGradeSubscriptionToVersion2(Object upgradeFromValue, Map<String, Object> parameterizedConfig) {
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", upgradeFromValue), parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"topic1"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        MatcherAssert.assertThat((Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()).version(), (Matcher)CoreMatchers.equalTo((Object)2));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, allTasks, AssignmentTestUtils.EMPTY_TASKS).encode(), Arrays.asList(this.t1p0, this.t1p1, this.t1p2), -1, Optional.of("rack1")));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack2")));
        this.createMockTaskManager(allTasks, allTasks);
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        AssignmentInfo actualAssignment = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData());
        MatcherAssert.assertThat((Object)actualAssignment.version(), (Matcher)Matchers.is((Object)11));
        MatcherAssert.assertThat((Object)actualAssignment.activeTasks(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)actualAssignment.partitionsByHost(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)actualAssignment.standbyPartitionByHost(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)actualAssignment.errCode(), (Matcher)Matchers.is((Object)0));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.createMockTaskManager(allTasks, allTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1), parameterizedConfig);
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).partitions(), (Matcher)CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p2)));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).userData()), (Matcher)CoreMatchers.equalTo((Object)new AssignmentInfo(11, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.singletonList(this.t1p1)));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()), (Matcher)CoreMatchers.equalTo((Object)new AssignmentInfo(11, Collections.singletonList(AssignmentTestUtils.TASK_0_1), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2, parameterizedConfig);
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        this.builder.addSource(null, "KSTREAM-SOURCE-0000000000", null, null, null, new String[]{"input-stream"});
        this.builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new MockApiProcessorSupplier(), new String[]{"KSTREAM-SOURCE-0000000000"});
        this.builder.addProcessor("KSTREAM-BRANCH-0000000002", new MockApiProcessorSupplier(), new String[]{"KSTREAM-FLATMAPVALUES-0000000001"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-MAP-0000000005", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000003"});
        this.builder.addInternalTopic("odd_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000005"});
        this.builder.addSink("odd_store-repartition-sink", "odd_store-repartition", null, null, null, new String[]{"odd_store-repartition-filter"});
        this.builder.addSource(null, "odd_store-repartition-source", null, null, null, new String[]{"odd_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000006", new MockApiProcessorSupplier(), new String[]{"odd_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000010", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000006"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000011", new MockApiProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000010"});
        this.builder.addProcessor("KSTREAM-MAP-0000000012", new MockApiProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000011"});
        this.builder.addInternalTopic("odd_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store_2-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000012"});
        this.builder.addSink("odd_store_2-repartition-sink", "odd_store_2-repartition", null, null, null, new String[]{"odd_store_2-repartition-filter"});
        this.builder.addSource(null, "odd_store_2-repartition-source", null, null, null, new String[]{"odd_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000013", new MockApiProcessorSupplier(), new String[]{"odd_store_2-repartition-source"});
        this.builder.addProcessor("KSTREAM-MAP-0000000017", new MockApiProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000004"});
        this.builder.addInternalTopic("even_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000017"});
        this.builder.addSink("even_store-repartition-sink", "even_store-repartition", null, null, null, new String[]{"even_store-repartition-filter"});
        this.builder.addSource(null, "even_store-repartition-source", null, null, null, new String[]{"even_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000018", new MockApiProcessorSupplier(), new String[]{"even_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000022", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000018"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000023", new MockApiProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000022"});
        this.builder.addProcessor("KSTREAM-MAP-0000000024", new MockApiProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000023"});
        this.builder.addInternalTopic("even_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store_2-repartition-filter", new MockApiProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000024"});
        this.builder.addSink("even_store_2-repartition-sink", "even_store_2-repartition", null, null, null, new String[]{"even_store_2-repartition-filter"});
        this.builder.addSource(null, "even_store_2-repartition-source", null, null, null, new String[]{"even_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000025", new MockApiProcessorSupplier(), new String[]{"even_store_2-repartition-source"});
        this.builder.addProcessor("KTABLE-JOINTHIS-0000000030", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000013"});
        this.builder.addProcessor("KTABLE-JOINOTHER-0000000031", new MockApiProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000025"});
        this.builder.addProcessor("KTABLE-MERGE-0000000029", new MockApiProcessorSupplier(), new String[]{"KTABLE-JOINTHIS-0000000030", "KTABLE-JOINOTHER-0000000031"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000032", new MockApiProcessorSupplier(), new String[]{"KTABLE-MERGE-0000000029"});
        List<String> topics = Arrays.asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition");
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> repartitionTopics = StreamsPartitionAssignorTest.getTopicPartitionInfo(4, Arrays.asList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-odd_store-repartition"}), Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-odd_store-repartition", "stream-partition-assignor-test-odd_store_2-repartition", "stream-partition-assignor-test-even_store-repartition", "stream-partition-assignor-test-even_store_2-repartition"})));
        this.configurePartitionAssignorWith(Collections.emptyMap(), repartitionTopics, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        Cluster metadata = new Cluster("cluster", Arrays.asList(AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_3), Collections.singletonList(new PartitionInfo("input-stream", 0, AssignmentTestUtils.NODE_0, AssignmentTestUtils.REPLICA_0, AssignmentTestUtils.REPLICA_0)), Collections.emptySet(), Collections.emptySet());
        this.partitionAssignor.assign(metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldGetAssignmentConfigs(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        Map<String, Object> props = this.configProps(parameterizedConfig);
        props.put("acceptable.recovery.lag", 11);
        props.put("max.warmup.replicas", 33);
        props.put("num.standby.replicas", 44);
        props.put("probing.rebalance.interval.ms", 3300000L);
        this.partitionAssignor.configure(props);
        MatcherAssert.assertThat((Object)this.partitionAssignor.acceptableRecoveryLag(), (Matcher)CoreMatchers.equalTo((Object)11L));
        MatcherAssert.assertThat((Object)this.partitionAssignor.maxWarmupReplicas(), (Matcher)CoreMatchers.equalTo((Object)33));
        MatcherAssert.assertThat((Object)this.partitionAssignor.numStandbyReplicas(), (Matcher)CoreMatchers.equalTo((Object)44));
        MatcherAssert.assertThat((Object)this.partitionAssignor.probingRebalanceIntervalMs(), (Matcher)CoreMatchers.equalTo((Object)3300000L));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldGetTime(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.time.setCurrentTimeMs(Long.MAX_VALUE);
        this.createDefaultMockTaskManager();
        Map<String, Object> props = this.configProps(parameterizedConfig);
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
        MatcherAssert.assertThat((Object)assignorConfiguration.referenceContainer().time.milliseconds(), (Matcher)CoreMatchers.equalTo((Object)Long.MAX_VALUE));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogEndOffsets(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        int changelogNumPartitions = 3;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(2)), true);
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(2, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog"})));
        this.configurePartitionAssignorWith(Collections.emptyMap(), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        Assertions.assertThrows(IllegalStateException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOffsets(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store2", false), new String[]{"processor1"});
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)), true);
        this.createDefaultMockTaskManager();
        List<Map<String, List<TopicPartitionInfo>>> changelogTopicPartitionInfo = StreamsPartitionAssignorTest.getTopicPartitionInfo(3, Collections.singletonList(Utils.mkSet((Object[])new String[]{"stream-partition-assignor-test-store1-changelog"})));
        this.configurePartitionAssignorWith(Collections.emptyMap(), changelogTopicPartitionInfo, parameterizedConfig);
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        Assertions.assertThrows(IllegalStateException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.adminClient = (Admin)Mockito.mock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)Mockito.mock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(Collections.emptyMap());
        Mockito.when((Object)this.adminClient.listOffsets(Collections.emptyMap())).thenReturn((Object)result);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack4")));
        this.configureDefault(parameterizedConfig);
        List<Map<String, List<TopicPartitionInfo>>> partitionInfo = Collections.singletonList(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"stream-partition-assignor-test-store-changelog", Collections.singletonList(new TopicPartitionInfo(0, new Node(1, "h1", 80), Collections.singletonList(new Node(1, "h1", 80)), Collections.emptyList())))}));
        this.overwriteInternalTopicManagerWithMock(true, partitionInfo, parameterizedConfig);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldRequestEndOffsetsForPreexistingChangelogs(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Set changelogs = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("stream-partition-assignor-test-store-changelog", 0), new TopicPartition("stream-partition-assignor-test-store-changelog", 1), new TopicPartition("stream-partition-assignor-test-store-changelog", 2)});
        this.adminClient = (Admin)Mockito.mock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)Mockito.mock(ListOffsetsResult.class);
        for (TopicPartition entry : changelogs) {
            KafkaFutureImpl partitionFuture = new KafkaFutureImpl();
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)Mockito.mock(ListOffsetsResult.ListOffsetsResultInfo.class);
            Mockito.when((Object)info.offset()).thenReturn((Object)Long.MAX_VALUE);
            partitionFuture.complete((Object)info);
            Mockito.when((Object)result.partitionResult(entry)).thenReturn((Object)partitionFuture);
        }
        ArgumentCaptor capturedChangelogs = ArgumentCaptor.forClass(Map.class);
        Mockito.when((Object)this.adminClient.listOffsets((Map)capturedChangelogs.capture())).thenReturn((Object)result);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.configureDefault(parameterizedConfig);
        List<Map<String, List<TopicPartitionInfo>>> partitionInfo = Collections.singletonList(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"stream-partition-assignor-test-store-changelog", Collections.singletonList(new TopicPartitionInfo(0, new Node(1, "h1", 80), Collections.singletonList(new Node(1, "h1", 80)), Collections.emptyList())))}));
        this.overwriteInternalTopicManagerWithMock(false, partitionInfo, parameterizedConfig);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        MatcherAssert.assertThat(((Map)capturedChangelogs.getValue()).keySet(), (Matcher)CoreMatchers.equalTo((Object)changelogs));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, true);
        Set changelogs = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Materialized.as((String)"store"));
        Properties props = new Properties();
        props.putAll(this.configProps(parameterizedConfig));
        props.put("topology.optimization", "all");
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(props));
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig((Map)props));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack3")));
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("topology.optimization", "all"), parameterizedConfig);
        this.overwriteInternalTopicManagerWithMock(false, parameterizedConfig);
        Consumer consumerClient = this.referenceContainer.mainConsumer;
        Mockito.when((Object)consumerClient.committed(changelogs)).thenReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE))));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldEncodeMissingSourceTopicError(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        Cluster emptyClusterMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptyList(), Collections.emptySet(), Collections.emptySet());
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.configureDefault(parameterizedConfig);
        this.subscriptions.put("consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        Map assignments = this.partitionAssignor.assign(emptyClusterMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer")).userData()).errCode(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testUniqueField(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"input"});
        Assertions.assertEquals((int)0, (int)this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(topics);
        Assertions.assertEquals((int)1, (int)this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(topics);
        Assertions.assertEquals((int)2, (int)this.partitionAssignor.uniqueField());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testUniqueFieldOverflow(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.createDefaultMockTaskManager();
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"input"});
        for (int i = 0; i < 127; ++i) {
            this.partitionAssignor.subscriptionUserData(topics);
        }
        Assertions.assertEquals((int)127, (int)this.partitionAssignor.uniqueField());
        this.partitionAssignor.subscriptionUserData(topics);
        Assertions.assertEquals((int)-128, (int)this.partitionAssignor.uniqueField());
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.builder = new CorruptedInternalTopologyBuilder();
        this.topologyMetadata = new TopologyMetadata(this.builder, new StreamsConfig(this.configProps(parameterizedConfig)));
        InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(this.builder);
        KStream inputTopic = streamsBuilder.stream(Collections.singleton("topic1"), new ConsumedInternal());
        KTable inputTable = streamsBuilder.table("topic2", new ConsumedInternal(), new MaterializedInternal(Materialized.as((String)"store")));
        inputTopic.groupBy((k, v) -> k, Grouped.with((String)"GroupName", (Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMinutes(10L))).aggregate(() -> "", (k, v, a) -> a + k).leftJoin(inputTable, v -> v, (x, y) -> x + y);
        streamsBuilder.buildAndOptimizeTopology();
        this.configureDefault(parameterizedConfig);
        this.subscriptions.put("consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"), this.defaultSubscriptionInfo.encode(), Collections.emptyList(), -1, Optional.of("rack1")));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer")).userData()).errCode(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.ASSIGNMENT_ERROR.code()));
    }

    @ParameterizedTest
    @MethodSource(value={"parameter"})
    public void testClientTags(Map<String, Object> parameterizedConfig) {
        this.setUp(parameterizedConfig, false);
        this.clientTags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cluster", (Object)"cluster1"), Utils.mkEntry((Object)"zone", (Object)"az1")});
        this.createDefaultMockTaskManager();
        this.configureDefaultPartitionAssignor(parameterizedConfig);
        Set topics = Utils.mkSet((Object[])new String[]{"input"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        SubscriptionInfo info = AssignmentTestUtils.getInfo(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, (byte)1, this.clientTags);
        Assertions.assertEquals(Collections.singletonList("input"), (Object)subscription.topics());
        Assertions.assertEquals((Object)info, (Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()));
        Assertions.assertEquals(this.clientTags, (Object)this.partitionAssignor.clientTags());
    }

    private static ByteBuffer encodeFutureSubscription() {
        ByteBuffer buf = ByteBuffer.allocate(8);
        buf.putInt(12);
        buf.putInt(12);
        return buf;
    }

    private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(int oldVersion, Map<String, Object> parameterizedConfig) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(oldVersion, AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of("rack0")));
        this.subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription(), Collections.emptyList(), -1, Optional.of("rack1")));
        this.configureDefault(parameterizedConfig);
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).userData()).errCode(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.ASSIGNMENT_ERROR.code()));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get("future-consumer")).userData()).errCode(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.ASSIGNMENT_ERROR.code()));
    }

    private static ConsumerPartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> firstHostState) {
        AssignmentInfo info = new AssignmentInfo(11, Collections.emptyList(), Collections.emptyMap(), firstHostState, Collections.emptyMap(), 0);
        return new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), info.encode());
    }

    private static AssignmentInfo checkAssignment(Set<String> expectedTopics, ConsumerPartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
        Assertions.assertEquals((int)assignment.partitions().size(), (int)info.activeTasks().size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assertions.assertEquals(activeTasks, (Object)info.activeTasks());
        Assertions.assertEquals(expectedTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry entry : info.standbyTasks().entrySet()) {
            TaskId id = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            for (TopicPartition partition : partitions) {
                Assertions.assertEquals((int)id.partition(), (int)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (!info.standbyTasks().isEmpty()) {
            Assertions.assertEquals(expectedTopics, standbyTopics);
        }
        return info;
    }

    private static void assertEquivalentAssignment(Map<String, List<TaskId>> thisAssignment, Map<String, List<TaskId>> otherAssignment) {
        Assertions.assertEquals((int)thisAssignment.size(), (int)otherAssignment.size());
        for (Map.Entry<String, List<TaskId>> entry : thisAssignment.entrySet()) {
            String consumer = entry.getKey();
            Assertions.assertTrue((boolean)otherAssignment.containsKey(consumer));
            List<TaskId> thisTaskList = entry.getValue();
            Collections.sort(thisTaskList);
            List<TaskId> otherTaskList = otherAssignment.get(consumer);
            Collections.sort(otherTaskList);
            MatcherAssert.assertThat(thisTaskList, (Matcher)CoreMatchers.equalTo(otherTaskList));
        }
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> changelogTopics, List<Integer> topicsNumPartitions) {
        if (changelogTopics.size() != topicsNumPartitions.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicsNumPartitions.size() + " different numPartitions for the topics");
        }
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            int numPartitions = topicsNumPartitions.get(i);
            for (int partition = 0; partition < numPartitions; ++partition) {
                changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE);
            }
        }
        return changelogEndOffsets;
    }

    private static Map<String, TopicDescription> getTopicDescriptionMap(List<String> changelogTopics, List<List<TopicPartitionInfo>> topicPartitionInfos) {
        if (changelogTopics.size() != topicPartitionInfos.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicPartitionInfos.size() + " different topicPartitionInfo for the topics");
        }
        HashMap<String, TopicDescription> changeLogTopicDescriptions = new HashMap<String, TopicDescription>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            List<TopicPartitionInfo> topicPartitionInfo = topicPartitionInfos.get(i);
            changeLogTopicDescriptions.put(topic, new TopicDescription(topic, false, topicPartitionInfo));
        }
        return changeLogTopicDescriptions;
    }

    private static SubscriptionInfo getInfoForOlderVersion(int version, ProcessId processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
        return new SubscriptionInfo(version, 11, processId, null, StreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks, standbyTasks), 0, 0, AssignmentTestUtils.EMPTY_CLIENT_TAGS);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> activeTasks, Collection<TaskId> standbyTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }

    private static Map<TaskId, Long> getTaskEndOffsetSums(Collection<TaskId> allStatefulTasks) {
        return allStatefulTasks.stream().collect(Collectors.toMap(t -> t, t -> Long.MAX_VALUE));
    }

    private static List<Map<String, List<TopicPartitionInfo>>> getTopicPartitionInfo(int replicaCount, List<Set<String>> topicsList) {
        List<KeyValue> nodes = Arrays.asList(KeyValue.pair((Object)AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_0)), KeyValue.pair((Object)AssignmentTestUtils.NODE_1, Arrays.asList(AssignmentTestUtils.REPLICA_1)), KeyValue.pair((Object)AssignmentTestUtils.NODE_2, Arrays.asList(AssignmentTestUtils.REPLICA_2)), KeyValue.pair((Object)AssignmentTestUtils.NODE_3, Arrays.asList(AssignmentTestUtils.REPLICA_3)));
        ArrayList<Map<String, List<TopicPartitionInfo>>> topicPartitionInfo = new ArrayList<Map<String, List<TopicPartitionInfo>>>();
        for (Set<String> topics : topicsList) {
            HashMap topicInfoMap = new HashMap();
            topicPartitionInfo.add(topicInfoMap);
            for (String topic : topics) {
                ArrayList<TopicPartitionInfo> topicPartitionInfoList = new ArrayList<TopicPartitionInfo>();
                topicInfoMap.put(topic, topicPartitionInfoList);
                for (int i = 0; i < replicaCount; ++i) {
                    topicPartitionInfoList.add(new TopicPartitionInfo(i, (Node)nodes.get((int)i).key, (List)nodes.get((int)i).value, (List)nodes.get((int)i).value));
                }
            }
        }
        return topicPartitionInfo;
    }

    private static class CorruptedInternalTopologyBuilder
    extends InternalTopologyBuilder {
        private Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> corruptedTopicGroups;

        private CorruptedInternalTopologyBuilder() {
        }

        public synchronized Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyToTopicsInfo() {
            if (this.corruptedTopicGroups == null) {
                this.corruptedTopicGroups = new HashMap<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo>();
                for (Map.Entry topicGroupEntry : super.subtopologyToTopicsInfo().entrySet()) {
                    InternalTopologyBuilder.TopicsInfo originalInfo = (InternalTopologyBuilder.TopicsInfo)topicGroupEntry.getValue();
                    this.corruptedTopicGroups.put((TopologyMetadata.Subtopology)topicGroupEntry.getKey(), new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), originalInfo.sourceTopics, originalInfo.repartitionSourceTopics, originalInfo.stateChangelogTopics));
                }
            }
            return this.corruptedTopicGroups;
        }
    }
}

