/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
public class StreamsAssignmentScaleTest {
    static final long MAX_ASSIGNMENT_DURATION = 60000L;
    static final String APPLICATION_ID = "streams-assignment-scale-test";
    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);

    @Test(timeout=120000L)
    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
        this.completeLargeAssignment(6000, 2, 1, 1, HighAvailabilityTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
        this.completeLargeAssignment(1000, 1000, 1, 1, HighAvailabilityTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testHighAvailabilityTaskAssignorManyStandbys() {
        this.completeLargeAssignment(1000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
        this.completeLargeAssignment(1000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testStickyTaskAssignorLargePartitionCount() {
        this.completeLargeAssignment(2000, 2, 1, 1, StickyTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testStickyTaskAssignorLargeNumConsumers() {
        this.completeLargeAssignment(1000, 1000, 1, 1, StickyTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testStickyTaskAssignorManyStandbys() {
        this.completeLargeAssignment(1000, 100, 1, 20, StickyTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testStickyTaskAssignorManyThreadsPerClient() {
        this.completeLargeAssignment(1000, 10, 1000, 1, StickyTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testFallbackPriorTaskAssignorLargePartitionCount() {
        this.completeLargeAssignment(2000, 2, 1, 1, FallbackPriorTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
        this.completeLargeAssignment(1000, 1000, 1, 1, FallbackPriorTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testFallbackPriorTaskAssignorManyStandbys() {
        this.completeLargeAssignment(1000, 100, 1, 20, FallbackPriorTaskAssignor.class);
    }

    @Test(timeout=120000L)
    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
        this.completeLargeAssignment(1000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
    }

    private void completeLargeAssignment(int numPartitions, int numClients, int numThreadsPerClient, int numStandbys, Class<? extends TaskAssignor> taskAssignor) {
        List<String> topic = Collections.singletonList("topic");
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int p = 0; p < numPartitions; ++p) {
            changelogEndOffsets.put(new TopicPartition("streams-assignment-scale-test-store-changelog", p), 100000L);
        }
        ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
        for (int p = 0; p < numPartitions; ++p) {
            partitionInfos.add(new PartitionInfo("topic", p, Node.noNode(), new Node[0], new Node[0]));
        }
        Cluster clusterMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), partitionInfos, Collections.emptySet(), Collections.emptySet());
        HashMap<String, Object> configMap = new HashMap<String, Object>();
        configMap.put("application.id", APPLICATION_ID);
        configMap.put("bootstrap.servers", "localhost:8080");
        InternalTopologyBuilder builder = new InternalTopologyBuilder();
        builder.addSource(null, "source", null, null, null, new String[]{"topic"});
        builder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store", false), new String[]{"processor"});
        TopologyMetadata topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configMap));
        topologyMetadata.buildAndRewriteTopology();
        Consumer mainConsumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        TaskManager taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect((Object)taskManager.topologyMetadata()).andStubReturn((Object)topologyMetadata);
        EasyMock.expect((Object)mainConsumer.committed(new HashSet())).andStubReturn(Collections.emptyMap());
        AdminClient adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(changelogEndOffsets);
        ReferenceContainer referenceContainer = new ReferenceContainer();
        referenceContainer.mainConsumer = mainConsumer;
        referenceContainer.adminClient = adminClient;
        referenceContainer.taskManager = taskManager;
        referenceContainer.streamsMetadataState = (StreamsMetadataState)EasyMock.createNiceMock(StreamsMetadataState.class);
        referenceContainer.time = new MockTime();
        configMap.put("__reference.container.instance__", referenceContainer);
        configMap.put("internal.task.assignor.class", taskAssignor.getName());
        configMap.put("num.standby.replicas", numStandbys);
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)new MockTime(), new StreamsConfig(configMap), new MockClientSupplier().restoreConsumer, false);
        EasyMock.replay((Object[])new Object[]{taskManager, adminClient, mainConsumer});
        StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
        partitionAssignor.configure(configMap);
        partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        HashMap<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
        for (int client = 0; client < numClients; ++client) {
            for (int i = 0; i < numThreadsPerClient; ++i) {
                subscriptions.put(this.getConsumerName(i, client), new ConsumerPartitionAssignor.Subscription(topic, AssignmentTestUtils.getInfo(AssignmentTestUtils.uuidForInt(client), AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
            }
        }
        long firstAssignmentStartMs = System.currentTimeMillis();
        Map firstAssignments = partitionAssignor.assign(clusterMetadata, new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
        long firstAssignmentEndMs = System.currentTimeMillis();
        long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
        if (firstAssignmentDuration > 60000L) {
            throw new AssertionError((Object)("The first assignment took too long to complete at " + firstAssignmentDuration + "ms."));
        }
        this.log.info("First assignment took {}ms.", (Object)firstAssignmentDuration);
        for (int client = 0; client < numClients; ++client) {
            for (int i = 0; i < numThreadsPerClient; ++i) {
                String consumer = this.getConsumerName(i, client);
                ConsumerPartitionAssignor.Assignment assignment = (ConsumerPartitionAssignor.Assignment)firstAssignments.get(consumer);
                AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
                subscriptions.put(consumer, new ConsumerPartitionAssignor.Subscription(topic, AssignmentTestUtils.getInfo(AssignmentTestUtils.uuidForInt(client), new HashSet<TaskId>(info.activeTasks()), info.standbyTasks().keySet()).encode(), assignment.partitions()));
            }
        }
        long secondAssignmentStartMs = System.currentTimeMillis();
        Map secondAssignments = partitionAssignor.assign(clusterMetadata, new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
        long secondAssignmentEndMs = System.currentTimeMillis();
        long secondAssignmentDuration = secondAssignmentEndMs - secondAssignmentStartMs;
        if (secondAssignmentDuration > 60000L) {
            throw new AssertionError((Object)("The second assignment took too long to complete at " + secondAssignmentDuration + "ms."));
        }
        this.log.info("Second assignment took {}ms.", (Object)secondAssignmentDuration);
        MatcherAssert.assertThat((Object)secondAssignments.size(), (Matcher)CoreMatchers.is((Object)(numClients * numThreadsPerClient)));
    }

    private String getConsumerName(int consumerIndex, int clientIndex) {
        return "consumer-" + clientIndex + "-" + consumerIndex;
    }
}

