/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.shareddata;

import io.vertx.Lifecycle;
import io.vertx.LoggingTestWatcher;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.ClusteredAsyncMapTest;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.cluster.infinispan.InfinispanAsyncMap;
import io.vertx.ext.cluster.infinispan.InfinispanClusterManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class InfinispanClusteredAsyncMapTest
extends ClusteredAsyncMapTest {
    @Rule
    public LoggingTestWatcher watchman = new LoggingTestWatcher();
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    public void setUp() throws Exception {
        System.setProperty("jgroups.file.location", this.temporaryFolder.newFolder().getAbsolutePath());
        super.setUp();
    }

    protected void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> ar) {
        CountDownLatch latch = new CountDownLatch(1);
        Promise promise = Promise.promise();
        promise.future().onComplete(ar);
        super.clusteredVertx(options, asyncResult -> {
            if (asyncResult.succeeded()) {
                promise.complete(asyncResult.result());
            } else {
                promise.fail(asyncResult.cause());
            }
            latch.countDown();
        });
        try {
            this.assertTrue(latch.await(2L, TimeUnit.MINUTES));
        }
        catch (InterruptedException e) {
            this.fail(e.getMessage());
        }
    }

    protected ClusterManager getClusterManager() {
        return new InfinispanClusterManager();
    }

    @Test
    public void testKeyStream() {
        this.testReadStream(InfinispanAsyncMap::keyStream, (map, keys) -> {
            this.assertEquals(map.size(), keys.size());
            this.assertTrue(keys.containsAll(map.keySet()));
        });
    }

    @Test
    public void testValueStream() {
        this.testReadStream(InfinispanAsyncMap::valueStream, (map, values) -> {
            this.assertEquals(map.size(), values.size());
            this.assertTrue(values.containsAll(map.values()));
            this.assertTrue(map.values().containsAll((Collection<?>)values));
        });
    }

    @Test
    public void testEntryStream() {
        this.testReadStream(InfinispanAsyncMap::entryStream, (map, entries) -> {
            this.assertEquals(map.size(), entries.size());
            this.assertTrue(entries.containsAll(map.entrySet()));
        });
    }

    private <T> void testReadStream(Function<InfinispanAsyncMap<JsonObject, Buffer>, ReadStream<T>> streamFactory, BiConsumer<Map<JsonObject, Buffer>, List<T>> assertions) {
        Map map = this.genJsonToBuffer(100);
        this.loadData(map, (vertx, asyncMap) -> {
            ArrayList items = new ArrayList();
            ReadStream stream = (ReadStream)streamFactory.apply(InfinispanAsyncMap.unwrap((AsyncMap)asyncMap));
            AtomicInteger idx = new AtomicInteger();
            long pause = 500L;
            long start = System.nanoTime();
            stream.endHandler(end -> {
                assertions.accept(map, items);
                long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                this.assertTrue(duration >= 3L * pause);
                this.testComplete();
            }).exceptionHandler(t -> this.fail((Throwable)t)).handler(item -> {
                items.add(item);
                int j = idx.getAndIncrement();
                if (j == 3 || j == 16 || j == 38) {
                    stream.pause();
                    int emitted = items.size();
                    vertx.setTimer(pause, tid -> {
                        this.assertTrue("Items emitted during pause", emitted == items.size());
                        stream.resume();
                    });
                }
            });
        });
        this.await();
    }

    @Test
    public void testClosedKeyStream() {
        Map map = this.genJsonToBuffer(100);
        this.loadData(map, (vertx, asyncMap) -> {
            ArrayList keys = new ArrayList();
            ReadStream stream = InfinispanAsyncMap.unwrap((AsyncMap)asyncMap).keyStream();
            stream.exceptionHandler(t -> this.fail((Throwable)t)).handler(jsonObject -> {
                keys.add(jsonObject);
                if (jsonObject.getInteger("key") == 38) {
                    stream.handler(null);
                    int emitted = keys.size();
                    vertx.setTimer(500L, tid -> {
                        this.assertTrue("Items emitted after close", emitted == keys.size());
                        this.testComplete();
                    });
                }
            });
        });
        this.await();
    }

    protected void close(List<Vertx> clustered) throws Exception {
        Lifecycle.close(clustered);
    }
}

