/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.ignite;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.ignite.impl.AsyncMapImpl;
import io.vertx.spi.cluster.ignite.impl.AsyncMultiMapImpl;
import io.vertx.spi.cluster.ignite.impl.MapImpl;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;

public class IgniteClusterManager
implements ClusterManager {
    private static final Logger log = LoggerFactory.getLogger(IgniteClusterManager.class);
    private static final String DEFAULT_CONFIG_FILE = "default-ignite.xml";
    private static final String CONFIG_FILE = "ignite.xml";
    public static final String VERTX_CACHE_TEMPLATE_NAME = "*";
    private static final String VERTX_NODE_PREFIX = "vertx.ignite.node.";
    private static final ExpiryPolicy DEFAULT_EXPIRY_POLICY = new ClearExpiryPolicy();
    private final Queue<String> pendingLocks = new ConcurrentLinkedQueue<String>();
    private Vertx vertx;
    private IgniteConfiguration cfg;
    private Ignite ignite;
    private boolean customIgnite;
    private String nodeID = UUID.randomUUID().toString();
    private NodeListener nodeListener;
    private IgnitePredicate<Event> eventListener;
    private volatile boolean active;
    private final Object monitor = new Object();
    private CollectionConfiguration collectionCfg;
    private ExecutorService lockReleaseExec;

    public IgniteClusterManager() {
        System.setProperty("IGNITE_NO_SHUTDOWN_HOOK", "true");
    }

    public IgniteClusterManager(IgniteConfiguration cfg) {
        this.cfg = cfg;
        this.setNodeID(cfg);
    }

    public IgniteClusterManager(URL configFile) {
        this.cfg = this.loadConfiguration(configFile);
    }

    public IgniteClusterManager(Ignite ignite) {
        Objects.requireNonNull(ignite, "Ignite instance can't be null.");
        this.ignite = ignite;
        this.customIgnite = true;
    }

    public Ignite getIgniteInstance() {
        return this.ignite;
    }

    public void setVertx(Vertx vertx) {
        this.vertx = vertx;
    }

    public void nodeListener(NodeListener nodeListener) {
        this.nodeListener = nodeListener;
    }

    public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
        this.vertx.executeBlocking(fut -> fut.complete(new AsyncMultiMapImpl(this.getCache(name), this.vertx)), handler);
    }

    public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> handler) {
        this.vertx.executeBlocking(fut -> fut.complete(new AsyncMapImpl(this.getCache(name), this.vertx)), handler);
    }

    public <K, V> Map<K, V> getSyncMap(String name) {
        return new MapImpl<K, V>(this.getCache(name));
    }

    public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> handler) {
        this.vertx.executeBlocking(fut -> {
            boolean locked;
            try {
                IgniteQueue queue = this.getQueue(name, true);
                this.pendingLocks.offer(name);
                locked = queue.offer((Object)this.getNodeID(), timeout, TimeUnit.MILLISECONDS);
                if (!locked) {
                    String ownerId = (String)queue.peek();
                    ClusterNode ownerNode = this.ignite.cluster().forNodeId(UUID.fromString(ownerId), new UUID[0]).node();
                    if (ownerNode == null) {
                        queue.remove((Object)ownerId);
                        locked = queue.offer((Object)this.getNodeID(), timeout, TimeUnit.MILLISECONDS);
                    }
                }
            }
            catch (Exception e) {
                throw new VertxException("Error during getting lock " + name, (Throwable)e);
            }
            finally {
                this.pendingLocks.remove(name);
            }
            if (!locked) {
                throw new VertxException("Timed out waiting to get lock " + name);
            }
            fut.complete((Object)new LockImpl(name));
        }, false, handler);
    }

    public void getCounter(String name, Handler<AsyncResult<Counter>> handler) {
        this.vertx.executeBlocking(fut -> fut.complete((Object)new CounterImpl(this.ignite.atomicLong(name, 0L, true))), handler);
    }

    public String getNodeID() {
        return this.nodeID;
    }

    public List<String> getNodes() {
        return this.ignite.cluster().nodes().stream().map(IgniteClusterManager::nodeId).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void join(Handler<AsyncResult<Void>> handler) {
        Object object = this.monitor;
        synchronized (object) {
            this.vertx.executeBlocking(fut -> {
                if (!this.active) {
                    this.active = true;
                    this.lockReleaseExec = Executors.newCachedThreadPool(r -> new Thread(r, "vertx-ignite-service-release-lock-thread"));
                    if (!this.customIgnite) {
                        this.ignite = this.cfg == null ? Ignition.start((IgniteConfiguration)this.loadConfiguration()) : Ignition.start((IgniteConfiguration)this.cfg);
                    }
                    this.nodeID = IgniteClusterManager.nodeId(this.ignite.cluster().localNode());
                    for (CacheConfiguration cacheCfg : this.ignite.configuration().getCacheConfiguration()) {
                        if (!cacheCfg.getName().equals(VERTX_CACHE_TEMPLATE_NAME)) continue;
                        this.collectionCfg = new CollectionConfiguration();
                        this.collectionCfg.setAtomicityMode(cacheCfg.getAtomicityMode());
                        this.collectionCfg.setBackups(cacheCfg.getBackups());
                        break;
                    }
                    if (this.collectionCfg == null) {
                        this.collectionCfg = new CollectionConfiguration();
                    }
                    this.eventListener = (IgnitePredicate & Serializable)event -> {
                        if (!this.active) {
                            return false;
                        }
                        if (this.nodeListener != null) {
                            this.vertx.executeBlocking(f -> {
                                if (this.isActive()) {
                                    switch (event.type()) {
                                        case 10: {
                                            this.nodeListener.nodeAdded(IgniteClusterManager.nodeId(((DiscoveryEvent)event).eventNode()));
                                            break;
                                        }
                                        case 11: 
                                        case 12: {
                                            String nodeId = IgniteClusterManager.nodeId(((DiscoveryEvent)event).eventNode());
                                            this.nodeListener.nodeLeft(nodeId);
                                            this.releasePendingLocksForFailedNode(nodeId);
                                        }
                                    }
                                }
                                f.complete();
                            }, null);
                        }
                        return true;
                    };
                    this.ignite.events().localListen(this.eventListener, new int[]{10, 11, 12});
                    fut.complete();
                }
            }, handler);
        }
    }

    private void releasePendingLocksForFailedNode(String nodeId) {
        HashSet processed = new HashSet();
        this.pendingLocks.forEach(name -> {
            IgniteQueue queue;
            if (processed.add(name) && (queue = this.getQueue((String)name, false)) != null && nodeId.equals(queue.peek())) {
                queue.remove((Object)nodeId);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void leave(Handler<AsyncResult<Void>> handler) {
        Object object = this.monitor;
        synchronized (object) {
            this.vertx.executeBlocking(fut -> {
                if (this.active) {
                    this.active = false;
                    this.lockReleaseExec.shutdown();
                    try {
                        if (!this.customIgnite) {
                            this.ignite.close();
                        } else if (this.eventListener != null) {
                            this.ignite.events().stopLocalListen(this.eventListener, new int[]{10, 11, 12});
                        }
                    }
                    catch (Exception e) {
                        log.error((Object)e);
                    }
                }
                fut.complete();
            }, handler);
        }
    }

    public boolean isActive() {
        return this.active;
    }

    private IgniteConfiguration loadConfiguration(URL config) {
        try {
            IgniteConfiguration cfg = (IgniteConfiguration)F.first((Iterable)((Iterable)IgnitionEx.loadConfigurations((URL)config).get1()));
            this.setNodeID(cfg);
            return cfg;
        }
        catch (IgniteCheckedException e) {
            log.error((Object)"Configuration loading error:", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private IgniteConfiguration loadConfiguration() {
        ClassLoader ctxClsLoader = Thread.currentThread().getContextClassLoader();
        InputStream is = null;
        if (ctxClsLoader != null) {
            is = ctxClsLoader.getResourceAsStream(CONFIG_FILE);
        }
        if (is == null && (is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_FILE)) == null) {
            is = this.getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE);
            log.info((Object)"Using default configuration.");
        }
        try {
            IgniteConfiguration cfg = (IgniteConfiguration)F.first((Iterable)((Iterable)IgnitionEx.loadConfigurations((InputStream)is).get1()));
            this.setNodeID(cfg);
            return cfg;
        }
        catch (IgniteCheckedException e) {
            log.error((Object)"Configuration loading error:", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void setNodeID(IgniteConfiguration cfg) {
        UUID uuid = UUID.fromString(this.nodeID);
        cfg.setNodeId(uuid);
        cfg.setIgniteInstanceName(VERTX_NODE_PREFIX + uuid);
    }

    private <K, V> IgniteCache<K, V> getCache(String name) {
        IgniteCache cache = this.ignite.getOrCreateCache(name);
        return cache.withExpiryPolicy(DEFAULT_EXPIRY_POLICY);
    }

    private <T> IgniteQueue<T> getQueue(String name, boolean create) {
        return this.ignite.queue(name, 1, create ? this.collectionCfg : null);
    }

    private static String nodeId(ClusterNode node) {
        return node.id().toString();
    }

    private static class ClearExpiryPolicy
    implements ExpiryPolicy,
    Serializable {
        private ClearExpiryPolicy() {
        }

        public Duration getExpiryForCreation() {
            return Duration.ETERNAL;
        }

        public Duration getExpiryForAccess() {
            return Duration.ETERNAL;
        }

        public Duration getExpiryForUpdate() {
            return Duration.ETERNAL;
        }
    }

    private class CounterImpl
    implements Counter {
        private final IgniteAtomicLong cnt;

        private CounterImpl(IgniteAtomicLong cnt) {
            this.cnt = cnt;
        }

        public void get(Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.get()), handler);
        }

        public void incrementAndGet(Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.incrementAndGet()), handler);
        }

        public void getAndIncrement(Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.getAndIncrement()), handler);
        }

        public void decrementAndGet(Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.decrementAndGet()), handler);
        }

        public void addAndGet(long value, Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.addAndGet(value)), handler);
        }

        public void getAndAdd(long value, Handler<AsyncResult<Long>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.getAndAdd(value)), handler);
        }

        public void compareAndSet(long expected, long value, Handler<AsyncResult<Boolean>> handler) {
            Objects.requireNonNull(handler, "handler");
            IgniteClusterManager.this.vertx.executeBlocking(fut -> fut.complete((Object)this.cnt.compareAndSet(expected, value)), handler);
        }
    }

    private class LockImpl
    implements Lock {
        private final String name;

        private LockImpl(String name) {
            this.name = name;
        }

        public void release() {
            IgniteClusterManager.this.lockReleaseExec.execute(() -> {
                IgniteQueue queue = IgniteClusterManager.this.getQueue(this.name, true);
                String ownerId = (String)queue.poll();
                if (ownerId == null) {
                    throw new VertxException("Inconsistent lock state " + this.name);
                }
            });
        }
    }
}

