/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsRebalanceListener;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
import org.slf4j.LoggerFactory;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class StreamsRebalanceListenerTest {
    @Mock
    private TaskManager taskManager;
    @Mock
    private StreamThread streamThread;
    private final AtomicInteger assignmentErrorCode = new AtomicInteger();
    private final MockTime time = new MockTime();
    private StreamsRebalanceListener streamsRebalanceListener;

    @Before
    public void setup() {
        this.streamsRebalanceListener = new StreamsRebalanceListener((Time)this.time, this.taskManager, this.streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), this.assignmentErrorCode);
    }

    @Test
    public void shouldThrowMissingSourceTopicException() {
        this.assignmentErrorCode.set(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code());
        MissingSourceTopicException exception = (MissingSourceTopicException)Assert.assertThrows(MissingSourceTopicException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"One or more source topics were missing during rebalance"));
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldSwallowVersionProbingError() {
        this.assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((StreamThread)Mockito.verify((Object)this.streamThread)).setState(StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread)Mockito.verify((Object)this.streamThread)).setPartitionAssignedTime(this.time.milliseconds());
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldSendShutdown() {
        this.assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRebalanceComplete();
        ((StreamThread)Mockito.verify((Object)this.streamThread)).shutdownToError();
    }

    @Test
    public void shouldThrowTaskAssignmentException() {
        this.assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
        TaskAssignmentException exception = (TaskAssignmentException)Assert.assertThrows(TaskAssignmentException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"Hit an unexpected exception during task assignment phase of rebalance"));
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
        this.assignmentErrorCode.set(Integer.MAX_VALUE);
        TaskAssignmentException exception = (TaskAssignmentException)Assert.assertThrows(TaskAssignmentException.class, () -> this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList()));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"Hit an unrecognized exception during rebalance"));
    }

    @Test
    public void shouldHandleAssignedPartitions() {
        this.assignmentErrorCode.set(AssignorError.NONE.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((StreamThread)Mockito.verify((Object)this.streamThread)).setState(StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread)Mockito.verify((Object)this.streamThread)).setPartitionAssignedTime(this.time.milliseconds());
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldHandleRevokedPartitions() {
        List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0));
        Mockito.when((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn((Object)StreamThread.State.RUNNING);
        this.streamsRebalanceListener.onPartitionsRevoked(partitions);
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleRevocation(partitions);
    }

    @Test
    public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
        Mockito.when((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn(null);
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0)));
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.never())).handleRevocation((Collection)ArgumentMatchers.any());
    }

    @Test
    public void shouldNotHandleEmptySetOfRevokedPartitions() {
        Mockito.when((Object)this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn((Object)StreamThread.State.RUNNING);
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.never())).handleRevocation((Collection)ArgumentMatchers.any());
    }

    @Test
    public void shouldHandleLostPartitions() {
        this.streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0)));
        ((TaskManager)Mockito.verify((Object)this.taskManager)).handleLostAll();
    }
}

