/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.clientside.ClientTestUtil;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.spi.ClientInvocationService;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.SmartClientInvocationService;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.bounce.MultiSocketClientDriverFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastOverloadException;
import com.hazelcast.core.IMap;
import com.hazelcast.internal.util.ThreadLocalRandomProvider;
import com.hazelcast.test.HazelcastParametersRunnerFactory;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.test.bounce.BounceMemberRule;
import com.hazelcast.test.bounce.DriverFactory;
import com.hazelcast.util.ExceptionUtil;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Parameterized.UseParametersRunnerFactory(value=HazelcastParametersRunnerFactory.class)
@Category(value={SlowTest.class})
public class ClientBackpressureBouncingTest
extends HazelcastTestSupport {
    private static final long TEST_DURATION_SECONDS = 240L;
    private static final long TEST_TIMEOUT_MILLIS = 600000L;
    private static final int MAX_CONCURRENT_INVOCATION_CONFIG = 100;
    private static final int WORKER_THREAD_COUNT = 5;
    @Rule
    public BounceMemberRule bounceMemberRule;
    private InvocationCheckingThread checkingThread;
    private long backoff;

    @Parameterized.Parameters(name="backoffTimeoutMillis:{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList({-1}, {60000});
    }

    public ClientBackpressureBouncingTest(int backoffTimeoutMillis) {
        this.backoff = backoffTimeoutMillis;
        this.bounceMemberRule = BounceMemberRule.with((Config)new Config()).driverFactory((DriverFactory)new MultiSocketClientDriverFactory(new ClientConfig().setProperty(ClientProperty.MAX_CONCURRENT_INVOCATIONS.getName(), String.valueOf(100)).setProperty(ClientProperty.BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS.getName(), String.valueOf(this.backoff)))).build();
    }

    @After
    public void tearDown() {
        if (this.checkingThread != null) {
            this.checkingThread.shutdown();
        }
    }

    @Test(timeout=600000L)
    public void testInFlightInvocationCountIsNotGrowing() {
        HazelcastInstance driver = this.bounceMemberRule.getNextTestDriver();
        IMap map = driver.getMap(ClientBackpressureBouncingTest.randomMapName());
        Runnable[] tasks = this.createTasks((IMap<Integer, Integer>)map);
        this.checkingThread = new InvocationCheckingThread(driver);
        this.checkingThread.start();
        this.bounceMemberRule.testRepeatedly(tasks, 240L);
        System.out.println("Finished bouncing");
        this.checkingThread.shutdown();
        this.checkingThread.assertInFlightInvocationsWereNotGrowing();
    }

    private Runnable[] createTasks(IMap<Integer, Integer> map) {
        Runnable[] tasks = new Runnable[5];
        for (int i = 0; i < 5; ++i) {
            tasks[i] = new MyRunnable(map, i);
        }
        return tasks;
    }

    private class MyRunnable
    implements Runnable {
        private final ExecutionCallback<Integer> callback = new CountingCallback();
        private final AtomicLong backpressureCounter = new AtomicLong();
        private final AtomicLong progressCounter = new AtomicLong();
        private final AtomicLong failureCounter = new AtomicLong();
        private final IMap<Integer, Integer> map;
        private final int workerNo;

        MyRunnable(IMap<Integer, Integer> map, int workerNo) {
            this.map = map;
            this.workerNo = workerNo;
        }

        @Override
        public void run() {
            block3: {
                try {
                    int key = ThreadLocalRandomProvider.get().nextInt();
                    this.map.getAsync((Object)key).andThen(this.callback);
                }
                catch (HazelcastOverloadException e) {
                    long current;
                    if (ClientBackpressureBouncingTest.this.backoff != -1L) {
                        Assert.fail((String)String.format("HazelcastOverloadException should not be thrown when backoff is configured (%d ms), but got: %s", new Object[]{ClientBackpressureBouncingTest.this.backoff, e}));
                    }
                    if ((current = this.backpressureCounter.incrementAndGet()) % 250000L != 0L) break block3;
                    System.out.println("Worker no. " + this.workerNo + " backpressured. counter: " + current);
                }
            }
        }

        private class CountingCallback
        implements ExecutionCallback<Integer> {
            private CountingCallback() {
            }

            public void onResponse(Integer response) {
                long position = MyRunnable.this.progressCounter.incrementAndGet();
                if (position % 50000L == 0L) {
                    System.out.println("Worker no. " + MyRunnable.this.workerNo + " at " + position);
                }
            }

            public void onFailure(Throwable t) {
                long position = MyRunnable.this.failureCounter.incrementAndGet();
                if (position % 100L == 0L) {
                    System.out.println("Failure Worker no. " + MyRunnable.this.workerNo + " at " + position);
                }
            }
        }
    }

    private static class InvocationCheckingThread
    extends Thread {
        private final long warmUpDeadline;
        private final long deadLine;
        private final ConcurrentMap<Long, ClientInvocation> invocations;
        private int maxInvocationCountObserved;
        private int maxInvocationCountObservedDuringWarmup;
        private volatile boolean running = true;

        private InvocationCheckingThread(HazelcastInstance client) {
            long durationMillis = TimeUnit.SECONDS.toMillis(240L);
            long now = System.currentTimeMillis();
            this.warmUpDeadline = now + durationMillis / 5L;
            this.deadLine = now + durationMillis;
            this.invocations = this.extractInvocations(client);
        }

        @Override
        public void run() {
            while (System.currentTimeMillis() < this.deadLine && this.running) {
                int currentSize = this.invocations.size();
                this.maxInvocationCountObserved = Math.max(currentSize, this.maxInvocationCountObserved);
                if (System.currentTimeMillis() < this.warmUpDeadline) {
                    this.maxInvocationCountObservedDuringWarmup = Math.max(currentSize, this.maxInvocationCountObservedDuringWarmup);
                }
                HazelcastTestSupport.sleepAtLeastMillis((long)100L);
            }
        }

        private void shutdown() {
            this.running = false;
            this.interrupt();
            HazelcastTestSupport.assertJoinable((Thread[])new Thread[]{this});
        }

        private void assertInFlightInvocationsWereNotGrowing() {
            Assert.assertTrue((String)"There are no invocations to be observed!", (this.maxInvocationCountObserved > 0 ? 1 : 0) != 0);
            long maximumTolerableInvocationCount = this.maxInvocationCountObservedDuringWarmup * 2;
            Assert.assertTrue((String)("Apparently number of in-flight invocations is growing. Max. number of in-flight invocation during first fifth of test duration: " + this.maxInvocationCountObservedDuringWarmup + " Max. number of in-flight invocation in total: " + this.maxInvocationCountObserved), ((long)this.maxInvocationCountObserved <= maximumTolerableInvocationCount ? 1 : 0) != 0);
        }

        private ConcurrentMap<Long, ClientInvocation> extractInvocations(HazelcastInstance client) {
            try {
                HazelcastClientInstanceImpl clientImpl = ClientTestUtil.getHazelcastClientInstanceImpl(client);
                ClientInvocationService invocationService = clientImpl.getInvocationService();
                SmartClientInvocationService smartInvocationService = (SmartClientInvocationService)invocationService;
                Field invocationsField = SmartClientInvocationService.class.getSuperclass().getDeclaredField("invocations");
                invocationsField.setAccessible(true);
                return (ConcurrentMap)invocationsField.get(smartInvocationService);
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }
    }
}

