/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.ignite.impl;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.spi.cluster.ignite.impl.IgniteRegistrationInfo;
import io.vertx.spi.cluster.ignite.impl.Throttling;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;

public class SubsMapHelper {
    private final IgniteCache<IgniteRegistrationInfo, Boolean> map;
    private final NodeSelector nodeSelector;
    private final ConcurrentMap<String, Set<RegistrationInfo>> localSubs = new ConcurrentHashMap<String, Set<RegistrationInfo>>();
    private final Throttling throttling;
    private volatile boolean shutdown;

    public SubsMapHelper(Ignite ignite, NodeSelector nodeSelector, VertxInternal vertxInternal) {
        this.map = ignite.getOrCreateCache("__vertx.subs");
        this.nodeSelector = nodeSelector;
        this.throttling = new Throttling(vertxInternal, this::getAndUpdate);
        this.shutdown = false;
        this.map.query((Query)new ContinuousQuery().setAutoUnsubscribe(true).setTimeInterval(100L).setPageSize(128).setLocalListener(l -> this.listen(l, vertxInternal)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void get(String address, Promise<List<RegistrationInfo>> promise) {
        if (this.shutdown) {
            promise.complete(null);
            return;
        }
        try {
            List infos = this.map.query((Query)new ScanQuery((IgniteBiPredicate & Serializable)(k, v) -> k.address().equals(address))).getAll().stream().map(Cache.Entry::getKey).map(IgniteRegistrationInfo::registrationInfo).collect(Collectors.toList());
            int size = infos.size();
            Set local = (Set)this.localSubs.get(address);
            if (local != null) {
                Set set = local;
                synchronized (set) {
                    if ((size += local.size()) == 0) {
                        promise.complete(Collections.emptyList());
                        return;
                    }
                    infos.addAll(local);
                }
            } else if (size == 0) {
                promise.complete(Collections.emptyList());
                return;
            }
            promise.complete(infos);
        }
        catch (IllegalStateException | CacheException e) {
            promise.fail((Throwable)new VertxException(e));
        }
    }

    public Future<Void> put(String address, RegistrationInfo registrationInfo) {
        if (this.shutdown) {
            return Future.failedFuture((Throwable)new VertxException("shutdown in progress"));
        }
        try {
            if (registrationInfo.localOnly()) {
                this.localSubs.compute(address, (add, curr) -> this.addToSet(registrationInfo, (Set<RegistrationInfo>)curr));
                this.fireRegistrationUpdateEvent(address);
            } else {
                this.map.put((Object)new IgniteRegistrationInfo(address, registrationInfo), (Object)Boolean.TRUE);
            }
        }
        catch (IllegalStateException | CacheException e) {
            return Future.failedFuture((Throwable)new VertxException(e));
        }
        return Future.succeededFuture();
    }

    private Set<RegistrationInfo> addToSet(RegistrationInfo registrationInfo, Set<RegistrationInfo> curr) {
        Set<RegistrationInfo> res = curr != null ? curr : Collections.synchronizedSet(new LinkedHashSet());
        res.add(registrationInfo);
        return res;
    }

    public void remove(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
        if (this.shutdown) {
            promise.complete();
            return;
        }
        try {
            if (registrationInfo.localOnly()) {
                this.localSubs.computeIfPresent(address, (add, curr) -> this.removeFromSet(registrationInfo, (Set<RegistrationInfo>)curr));
                this.fireRegistrationUpdateEvent(address);
            } else {
                this.map.remove((Object)new IgniteRegistrationInfo(address, registrationInfo));
            }
            promise.complete();
        }
        catch (IllegalStateException | CacheException e) {
            promise.fail((Throwable)new VertxException(e));
        }
    }

    private Set<RegistrationInfo> removeFromSet(RegistrationInfo registrationInfo, Set<RegistrationInfo> curr) {
        curr.remove(registrationInfo);
        return curr.isEmpty() ? null : curr;
    }

    public void removeAllForNode(String nodeId) {
        TreeSet toRemove = this.map.query((Query)new ScanQuery((IgniteBiPredicate & Serializable)(k, v) -> k.registrationInfo().nodeId().equals(nodeId))).getAll().stream().map(Cache.Entry::getKey).collect(Collectors.toCollection(TreeSet::new));
        try {
            this.map.removeAll((Set)toRemove);
        }
        catch (IllegalStateException | CacheException throwable) {
            // empty catch block
        }
    }

    public void leave() {
        this.shutdown = true;
    }

    private void fireRegistrationUpdateEvent(String address) {
        this.throttling.onEvent(address);
    }

    private Future<List<RegistrationInfo>> getAndUpdate(String address) {
        Promise prom = Promise.promise();
        if (this.nodeSelector.wantsUpdatesFor(address)) {
            prom.future().onSuccess(registrationInfos -> this.nodeSelector.registrationsUpdated(new RegistrationUpdateEvent(address, registrationInfos)));
            this.get(address, (Promise<List<RegistrationInfo>>)prom);
        } else {
            prom.complete();
        }
        return prom.future();
    }

    private void listen(Iterable<CacheEntryEvent<? extends IgniteRegistrationInfo, ? extends Boolean>> events, VertxInternal vertxInternal) {
        vertxInternal.executeBlocking(promise -> {
            StreamSupport.stream(events.spliterator(), false).map(e -> ((IgniteRegistrationInfo)e.getKey()).address()).distinct().forEach(this::fireRegistrationUpdateEvent);
            promise.complete();
        });
    }
}

