/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.eventbus;

import io.vertx.core.Completable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.clustered.NodeSelector;
import io.vertx.core.impl.VertxBootstrapImpl;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.ClusteredNode;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Test;

public class CustomNodeSelectorTest
extends VertxTestBase {
    private List<Vertx> vertices;

    @Test
    public void test() throws Exception {
        CompositeFuture startFuture = IntStream.range(0, 4).mapToObj(i -> {
            VertxOptions vertxOptions = this.getOptions();
            vertxOptions.getEventBusOptions().setClusterNodeMetadata(new JsonObject().put("rack", (Object)(i % 2 == 0 ? "foo" : "bar")));
            return vertxOptions;
        }).map(options -> {
            VertxBootstrapImpl factory = ((VertxBootstrapImpl)VertxBootstrap.create().options(options).init()).clusterManager((ClusterManager)new FakeClusterManager()).clusterNodeSelector((NodeSelector)new CustomNodeSelector());
            return factory.clusteredVertx();
        }).collect(Collectors.collectingAndThen(Collectors.toList(), Future::all));
        CountDownLatch startLatch = new CountDownLatch(1);
        startFuture.onComplete(this.onSuccess(cf -> startLatch.countDown()));
        this.awaitLatch(startLatch);
        this.vertices = startFuture.list();
        ConcurrentHashMap received = new ConcurrentHashMap();
        CountDownLatch latch = new CountDownLatch(8);
        CompositeFuture cf2 = IntStream.range(0, 4).mapToObj(i -> this.vertices.get(i).eventBus().consumer("test", msg -> {
            received.merge(i, Collections.singleton((String)msg.body()), (s1, s2) -> Stream.concat(s1.stream(), s2.stream()).collect(Collectors.toSet()));
            latch.countDown();
        })).map(MessageConsumer::completion).collect(Collectors.collectingAndThen(Collectors.toList(), Future::all));
        HashMap expected = new HashMap();
        cf2.onComplete(this.onSuccess(v -> {
            for (int i = 0; i < 4; ++i) {
                String s = String.valueOf((char)(97 + i));
                this.vertices.get(i).eventBus().publish("test", (Object)s);
                expected.merge(i, Collections.singleton(s), (s1, s2) -> Stream.concat(s1.stream(), s2.stream()).collect(Collectors.toSet()));
                expected.merge((i + 2) % 4, Collections.singleton(s), (s1, s2) -> Stream.concat(s1.stream(), s2.stream()).collect(Collectors.toSet()));
            }
        }));
        this.awaitLatch(latch);
        this.assertEquals(expected, received);
    }

    @Override
    protected void tearDown() throws Exception {
        try {
            if (this.vertices != null) {
                this.close(this.vertices);
            }
        }
        finally {
            super.tearDown();
        }
    }

    private static class CustomNodeSelector
    implements NodeSelector {
        private ClusteredNode clusterManager;
        private String rack;

        private CustomNodeSelector() {
        }

        public void init(ClusteredNode clusterManager) {
            this.clusterManager = clusterManager;
        }

        public void eventBusStarted() {
            this.rack = this.clusterManager.getNodeInfo().metadata().getString("rack");
        }

        public void selectForSend(String address, Completable<String> promise) {
            promise.fail("Not implemented");
        }

        public void selectForPublish(String address, Completable<Iterable<String>> promise) {
            List nodes = this.clusterManager.getNodes();
            CompositeFuture future = nodes.stream().map(nodeId -> {
                Promise nodeInfo = Promise.promise();
                this.clusterManager.getNodeInfo(nodeId, (Completable)nodeInfo);
                return nodeInfo.future();
            }).collect(Collectors.collectingAndThen(Collectors.toList(), Future::all));
            future.map(cf -> {
                ArrayList<String> res = new ArrayList<String>();
                for (int i = 0; i < nodes.size(); ++i) {
                    NodeInfo nodeInfo = (NodeInfo)cf.resultAt(i);
                    if (!nodeInfo.metadata().getString("rack").equals(this.rack)) continue;
                    res.add((String)nodes.get(i));
                }
                return res;
            }).onComplete(promise);
        }
    }
}

