/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.backpressure;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.limit.SettableLimit;
import io.camunda.zeebe.broker.transport.backpressure.CommandRateLimiter;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class CommandRateLimiterTest {
    private static final int INITIAL_LIMIT = 5;
    private final SettableLimit limit = new SettableLimit(5);
    private final CommandRateLimiter rateLimiter = ((CommandRateLimiter.CommandRateLimiterBuilder)CommandRateLimiter.builder().limit((Limit)this.limit)).build((MeterRegistry)new SimpleMeterRegistry(), 0);
    private final Intent context = ProcessInstanceCreationIntent.CREATE;

    CommandRateLimiterTest() {
    }

    @Test
    void shouldAcquire() {
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue();
    }

    @Test
    void shouldNotAcquireAfterLimit() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue());
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, this.context)).isFalse();
    }

    @Test
    void shouldCompleteRequestOnResponse() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> this.rateLimiter.tryAcquire(0, (long)i, this.context));
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
        this.rateLimiter.onResponse(0, 0L);
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 100L, this.context)).isTrue();
    }

    @Test
    void shouldCompleteAllRequests() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> this.rateLimiter.tryAcquire(0, (long)i, this.context));
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
        IntStream.range(0, this.limit.getLimit()).forEach(i -> this.rateLimiter.onResponse(0, (long)i));
        IntStream.range(0, this.limit.getLimit()).forEach(i -> Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, (long)i, this.context)).isTrue());
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
    }

    @Test
    void shouldReleaseRequestOnIgnore() {
        this.rateLimiter.tryAcquire(0, 1L, this.context);
        Assertions.assertThat((int)this.rateLimiter.getInflightCount()).isEqualTo(1);
        this.rateLimiter.onIgnore(0, 1L);
        Assertions.assertThat((int)this.rateLimiter.getInflightCount()).isEqualTo(0);
    }

    @ParameterizedTest
    @MethodSource(value={"provideWhitelistedIntents"})
    void shouldWhiteListedCommandAfterLimit(Intent intent) {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue());
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, this.context)).isFalse();
        Assertions.assertThat((boolean)this.rateLimiter.tryAcquire(0, 1L, intent)).isTrue();
    }

    private static Stream<Arguments> provideWhitelistedIntents() {
        return Stream.of(Arguments.of((Object[])new Object[]{JobIntent.COMPLETE}), Arguments.of((Object[])new Object[]{JobIntent.FAIL}), Arguments.of((Object[])new Object[]{ProcessInstanceIntent.CANCEL}), Arguments.of((Object[])new Object[]{DeploymentIntent.CREATE}), Arguments.of((Object[])new Object[]{DeploymentIntent.DISTRIBUTE}), Arguments.of((Object[])new Object[]{DeploymentDistributionIntent.COMPLETE}), Arguments.of((Object[])new Object[]{CommandDistributionIntent.ACKNOWLEDGE}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void shouldNotAllowInFlightHigherThanLimitWithNormalCommands() throws InterruptedException {
        int numThreads = 3000;
        int poolSize = 300;
        int limit = 100;
        SettableLimit myLimit = new SettableLimit(100);
        CommandRateLimiter myRateLimiter = ((CommandRateLimiter.CommandRateLimiterBuilder)CommandRateLimiter.builder().limit((Limit)myLimit)).build((MeterRegistry)new SimpleMeterRegistry(), 0);
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        Collection tasks = IntStream.range(0, 3000).mapToObj(i -> () -> {
            int sleepTime = ThreadLocalRandom.current().nextInt(100);
            myRateLimiter.tryAcquire(0, (long)i, this.context);
            Thread.sleep(sleepTime);
            Assertions.assertThat((int)myRateLimiter.getInflightCount()).isLessThanOrEqualTo(100);
            myRateLimiter.onResponse(0, (long)i);
            return null;
        }).collect(Collectors.toList());
        try {
            threadPool.invokeAll(tasks);
        }
        finally {
            threadPool.shutdown();
        }
        Assertions.assertThat((int)myRateLimiter.getInflightCount()).isEqualTo(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void shouldAllowInFlightHigherThanLimitWithWhitelistedCommands() throws InterruptedException {
        int numThreads = 3000;
        int poolSize = 300;
        int limit = 100;
        SettableLimit myLimit = new SettableLimit(100);
        CommandRateLimiter myRateLimiter = ((CommandRateLimiter.CommandRateLimiterBuilder)CommandRateLimiter.builder().limit((Limit)myLimit)).build((MeterRegistry)new SimpleMeterRegistry(), 0);
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        JobIntent whitelistedIntentContext = JobIntent.COMPLETE;
        Collection tasks = IntStream.range(0, 3000).mapToObj(i -> () -> {
            int sleepTime = ThreadLocalRandom.current().nextInt(100);
            myRateLimiter.tryAcquire(0, (long)i, (Intent)(i % 2 == 0 ? this.context : whitelistedIntentContext));
            Thread.sleep(sleepTime);
            Assertions.assertThat((int)myRateLimiter.getInflightCount()).isLessThanOrEqualTo(200);
            myRateLimiter.onResponse(0, (long)i);
            return null;
        }).collect(Collectors.toList());
        try {
            threadPool.invokeAll(tasks);
        }
        finally {
            threadPool.shutdown();
        }
        Assertions.assertThat((int)myRateLimiter.getInflightCount()).isEqualTo(0);
    }
}

