/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.core;

import co.paralleluniverse.common.MonitoringType;
import co.paralleluniverse.common.logging.LoggingUtils;
import co.paralleluniverse.common.spring.Service;
import co.paralleluniverse.common.util.DegenerateInvocationHandler;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.core.Backup;
import co.paralleluniverse.galaxy.core.BackupMonitor;
import co.paralleluniverse.galaxy.core.Cache;
import co.paralleluniverse.galaxy.core.ClusterService;
import co.paralleluniverse.galaxy.core.Comm;
import co.paralleluniverse.galaxy.core.JMXBackupMonitor;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MetricsBackupMonitor;
import co.paralleluniverse.galaxy.core.NodeNotFoundException;
import co.paralleluniverse.galaxy.core.ServerComm;
import co.paralleluniverse.galaxy.core.SlaveComm;
import java.beans.ConstructorProperties;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;

public class BackupImpl
extends ClusterService
implements Backup {
    private static final Logger LOG = LoggerFactory.getLogger(BackupImpl.class);
    private long maxDelayNanos = TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS);
    private final Comm serverComm;
    private final SlaveComm slaveComm;
    private Cache cache;
    private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
    private NonBlockingHashMapLong<BackupEntry> map;
    private final NonBlockingHashMapLong<BackupEntry> map1 = new NonBlockingHashMapLong();
    private final NonBlockingHashMapLong<BackupEntry> map2 = new NonBlockingHashMapLong();
    private volatile boolean copyImmediately;
    private final ReentrantLock currentBackupsLock = new ReentrantLock();
    private final Condition currentBackupsPossiblyReady = this.currentBackupsLock.newCondition();
    private final Map<Long, Message.BACKUP> currentBackups = new HashMap<Long, Message.BACKUP>();
    private long nextId = 100000L;
    private Message.BACKUP_PACKET lastSent;
    private volatile boolean awaitServer;
    private volatile boolean awaitSlaves;
    private boolean shouldFlush;
    private long lastFlush;
    private volatile boolean completedReplication = false;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final BackupMonitor monitor;

    @ConstructorProperties(value={"name", "cluster", "serverComm", "slaveComm", "monitoringType"})
    public BackupImpl(String name, Cluster cluster, ServerComm serverComm, SlaveComm slaveComm, MonitoringType monitoringType) {
        this(name, cluster, serverComm, slaveComm, BackupImpl.createMonitor(monitoringType, name));
    }

    BackupImpl(String name, Cluster cluster, ServerComm serverComm, SlaveComm slaveComm, BackupMonitor monitor) {
        super(name, cluster);
        this.monitor = monitor;
        if (cluster.hasServer() && serverComm == null) {
            throw new RuntimeException("Configured to have server but serverComm is null!");
        }
        this.serverComm = serverComm;
        this.slaveComm = slaveComm;
        if (slaveComm != null) {
            slaveComm.setBackup(this);
        }
        this.map = this.map1;
    }

    static BackupMonitor createMonitor(MonitoringType monitoringType, String name) {
        if (monitoringType == null) {
            return (BackupMonitor)Proxy.newProxyInstance(Cache.class.getClassLoader(), new Class[]{BackupMonitor.class}, (InvocationHandler)DegenerateInvocationHandler.INSTANCE);
        }
        switch (monitoringType) {
            case JMX: {
                return new JMXBackupMonitor(name);
            }
            case METRICS: {
                return new MetricsBackupMonitor();
            }
        }
        throw new IllegalArgumentException("Unknown MonitoringType " + (Object)((Object)monitoringType));
    }

    public void setMaxDelay(int maxDelayMillis) {
        this.assertDuringInitialization();
        this.maxDelayNanos = TimeUnit.NANOSECONDS.convert(maxDelayMillis, TimeUnit.MILLISECONDS);
    }

    @ManagedAttribute
    public int getMaxDelay() {
        return (int)TimeUnit.MILLISECONDS.convert(this.maxDelayNanos, TimeUnit.NANOSECONDS);
    }

    @Override
    public void init() throws Exception {
        if (this.serverComm instanceof Service) {
            this.removeDependency((Service)((Object)this.serverComm));
        }
        super.init();
    }

    @Override
    protected void postInit() throws Exception {
        ((Service)((Object)this.getCluster())).awaitAvailable();
        if (this.getCluster().getMaster(this.getCluster().getMyNodeId()) == null) {
            this.setReady(true);
        }
        super.postInit();
    }

    @Override
    protected void start(boolean master) {
        if (master) {
            this.startFlushThread();
        }
    }

    @Override
    public void switchToMaster() {
        super.switchToMaster();
        if (!this.isAvailable() || !this.completedReplication) {
            LOG.info("Node has not completed replication so cannot become master. Going offline!");
            this.getCluster().goOffline();
        } else {
            this.startFlushThread();
        }
    }

    @Override
    protected void shutdown() {
        super.shutdown();
        this.scheduler.shutdownNow();
    }

    @Override
    public void setCache(Cache cache) {
        this.assertDuringInitialization();
        this.cache = cache;
    }

    private void startFlushThread() {
        this.scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                BackupImpl.this.flushNow();
            }
        }, this.maxDelayNanos, this.maxDelayNanos, TimeUnit.NANOSECONDS);
    }

    @Override
    public boolean inv(long id, short owner) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("INV {}, {}", (Object)id, (Object)owner);
            }
            return !this.slaveComm.send(Message.INV(this.getCluster().getMyNodeId(), id, owner));
        }
        catch (NodeNotFoundException e) {
            throw new AssertionError((Object)e);
        }
    }

    @Override
    public boolean startBackup() {
        LOG.debug("start backup");
        this.mapLock.readLock().lock();
        if (this.copyImmediately) {
            this.currentBackupsLock.lock();
            if (!this.copyImmediately) {
                this.currentBackupsLock.unlock();
            } else {
                return true;
            }
        }
        return false;
    }

    @Override
    public void endBackup(boolean locked) {
        LOG.debug("end backup");
        this.mapLock.readLock().unlock();
        if (locked) {
            this.currentBackupsPossiblyReady.signal();
            this.currentBackupsLock.unlock();
        }
    }

    @Override
    public void backup(long id, long version) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Backup: {} ver: {} {}", new Object[]{LoggingUtils.hex(id), version, this.copyImmediately ? "(COPY)" : ""});
        }
        if (this.copyImmediately) {
            this.currentBackups.put(id, this.makeBackup(this.cache.getLine(id), version));
            this.oldMap().remove(id);
        } else {
            this.map.put(id, new BackupEntry(id, version));
        }
    }

    @Override
    public void flush() {
        this.scheduler.submit(new Runnable(){

            @Override
            public void run() {
                BackupImpl.this.flushNow();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushNow() {
        try {
            Message.BACKUP_PACKET packet;
            Message.BACKUP backup;
            Cache.CacheLine cacheLine;
            Cache.CacheLine line;
            BackupEntry be;
            Iterator<BackupEntry> it;
            NonBlockingHashMapLong<BackupEntry> oldMap = this.map;
            this.mapLock.writeLock().lock();
            try {
                if (oldMap.isEmpty()) {
                    return;
                }
                this.switchMaps();
            }
            finally {
                this.mapLock.writeLock().unlock();
            }
            LOG.debug("FLUSHING");
            this.currentBackupsLock.lock();
            try {
                assert (!this.copyImmediately);
                it = oldMap.values().iterator();
                while (it.hasNext()) {
                    be = it.next();
                    line = this.cache.getLine(be.id);
                    assert (line != null);
                    cacheLine = line;
                    synchronized (cacheLine) {
                        backup = this.makeBackup(line, be.version);
                        if (backup != null) {
                            oldMap.remove(be.id);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Copied {} ver {} for backup", (Object)LoggingUtils.hex(be.id), (Object)be.version);
                            }
                            this.currentBackups.put(be.id, backup);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Matching version for {} ({}) not found", (Object)LoggingUtils.hex(be.id), (Object)be.version);
                            }
                            this.copyImmediately = true;
                        }
                    }
                    it.remove();
                }
            }
            finally {
                this.currentBackupsLock.unlock();
            }
            if (this.copyImmediately) {
                LOG.debug("Incomplete backups. Completeing.");
                this.mapLock.writeLock().lock();
                this.currentBackupsLock.lock();
                try {
                    it = this.map.values().iterator();
                    while (it.hasNext()) {
                        be = it.next();
                        line = this.cache.getLine(be.id);
                        assert (line != null);
                        cacheLine = line;
                        synchronized (cacheLine) {
                            backup = this.makeBackup(line, be.version);
                            if (backup != null) {
                                this.map.remove(be.id);
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Copied {} ver {} for backup", (Object)LoggingUtils.hex(be.id), (Object)be.version);
                                }
                                this.currentBackups.put(be.id, backup);
                            } else {
                                oldMap.put(be.id, be);
                            }
                        }
                        it.remove();
                    }
                }
                finally {
                    this.currentBackupsLock.unlock();
                    this.mapLock.writeLock().unlock();
                }
                this.currentBackupsLock.lock();
                try {
                    it = oldMap.values().iterator();
                    while (it.hasNext()) {
                        be = it.next();
                        Message.BACKUP backup2 = this.currentBackups.get(be.id);
                        if (backup2 == null || backup2.getVersion() < be.version) continue;
                        it.remove();
                    }
                    while (!oldMap.isEmpty()) {
                        LOG.debug("Waiting for missing transactions: {}", oldMap);
                        this.currentBackupsPossiblyReady.await();
                    }
                    this.copyImmediately = false;
                }
                finally {
                    this.currentBackupsLock.unlock();
                }
            }
            if ((packet = this.flush1()) != null) {
                this.send(packet);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message.BACKUP_PACKET flush1() {
        this.currentBackupsLock.lock();
        try {
            if (this.lastSent == null) {
                this.shouldFlush = false;
                this.lastFlush = System.nanoTime();
                if (this.currentBackups.isEmpty()) {
                    Message.BACKUP_PACKET bACKUP_PACKET = null;
                    return bACKUP_PACKET;
                }
                Message.BACKUP_PACKET packet = Message.BACKUP_PACKET(this.nextId, this.currentBackups.values());
                ++this.nextId;
                this.lastSent = packet;
                this.currentBackups.clear();
                Message.BACKUP_PACKET bACKUP_PACKET = packet;
                return bACKUP_PACKET;
            }
            LOG.debug("Last backup not acked. Not sending.");
            long passedMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - this.lastFlush, TimeUnit.NANOSECONDS);
            if (passedMillis > 2000L) {
                LOG.warn("SLAVE HAS NOT ACKED IN {} MILLISECONDS. SOMETHING IS SERIOUSLY WRONG!", (Object)passedMillis);
            }
            this.shouldFlush = true;
            Message.BACKUP_PACKET bACKUP_PACKET = null;
            return bACKUP_PACKET;
        }
        finally {
            this.currentBackupsLock.unlock();
        }
    }

    private void send(Message.BACKUP_PACKET packet) {
        this.monitor.addBackupPacket();
        this.monitor.addBackups(packet.getBackups().size());
        try {
            this.awaitServer = true;
            this.awaitSlaves = true;
            if (this.serverComm != null) {
                LOG.debug("Sending backup packet to server: {}", (Object)packet);
                this.serverComm.send(packet);
            } else {
                this.ack(true);
            }
            if (!this.slaveComm.send(packet)) {
                this.ack(false);
            } else {
                LOG.debug("Sent backup packet to slaves: {}", (Object)packet);
            }
        }
        catch (NodeNotFoundException e) {
            throw new RuntimeException("Server not found!", e);
        }
    }

    private void switchMaps() {
        this.map = this.map == this.map1 ? this.map2 : this.map1;
    }

    private NonBlockingHashMapLong<BackupEntry> oldMap() {
        return this.map == this.map1 ? this.map2 : this.map1;
    }

    private Message.BACKUP makeBackup(Cache.CacheLine line, long version) {
        Message.BACKUP backup;
        if (line.getVersion() != version) {
            return null;
        }
        if (line.getData() == null) {
            backup = Message.BACKUP(line.getId(), line.getVersion(), null);
        } else {
            ByteBuffer buffer = ByteBuffer.allocate(line.getData().limit());
            line.rewind();
            buffer.put(line.getData());
            line.rewind();
            buffer.flip();
            backup = Message.BACKUP(line.getId(), line.getVersion(), buffer);
        }
        LOG.debug("Copying version {} of line {} data: {}", new Object[]{backup.getVersion(), LoggingUtils.hex(backup.getLine()), backup.getData() != null ? "(" + backup.getData().remaining() + " bytes)" : "null"});
        return backup;
    }

    private void serverAck(Message message) {
        Message.BACKUP_PACKETACK ack = (Message.BACKUP_PACKETACK)message;
        if (ack.getId() != this.lastSent.getId()) {
            LOG.warn("Received backup ack from server with id {} which is different from last sent: {}", (Object)ack.getId(), (Object)this.lastSent.getId());
            return;
        }
        this.ack(true);
    }

    @Override
    public void slavesAck(long id) {
        if (this.lastSent == null) {
            LOG.warn("Received backup ack from slaves with id {} but lastSent is null", (Object)id);
            return;
        }
        if (id != this.lastSent.getId()) {
            LOG.warn("Received backup ack from slaves with id {} which is different from last sent: {}", (Object)id, (Object)this.lastSent.getId());
            return;
        }
        this.ack(false);
    }

    @Override
    public void slavesInvAck(long id) {
        this.cache.receive(Message.INVACK(this.getCluster().getMyNodeId(), id));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ack(boolean server) {
        Message.BACKUP_PACKET _lastSent;
        LOG.debug("Ack {}", (Object)(server ? "server" : "slaves"));
        Message.BACKUP_PACKET packet = null;
        this.currentBackupsLock.lock();
        try {
            if (server && this.awaitSlaves) {
                this.awaitServer = false;
                return;
            }
            if (!server && this.awaitServer) {
                this.awaitSlaves = false;
                return;
            }
            _lastSent = this.lastSent;
            this.lastSent = null;
            this.awaitServer = false;
            this.awaitSlaves = false;
            if (this.shouldFlush) {
                packet = this.flush1();
            }
        }
        finally {
            this.currentBackupsLock.unlock();
        }
        for (Message.BACKUP backup : _lastSent.getBackups()) {
            this.cache.receive(Message.BACKUPACK((short)0, backup.getLine(), backup.getVersion()).setIncoming());
        }
        if (packet != null) {
            this.send(packet);
        }
    }

    @Override
    public Iterator<Message.BACKUP> iterOwned() {
        final Iterator<Cache.CacheLine> it = this.cache.ownedIterator();
        return new Iterator<Message.BACKUP>(){

            @Override
            public boolean hasNext() {
                return it.hasNext();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Message.BACKUP next() {
                Cache.CacheLine line;
                Cache.CacheLine cacheLine = line = (Cache.CacheLine)it.next();
                synchronized (cacheLine) {
                    BackupImpl.this.monitor.addReplicationBackup(1);
                    return (Message.BACKUP)Message.BACKUP(line.getId(), line.getVersion(), line.getData()).cloneDataBuffers();
                }
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override
    public void receive(Message message) {
        switch (message.getType()) {
            case BACKUP_PACKETACK: {
                this.serverAck(message);
                break;
            }
            case BACKUP_PACKET: {
                if (this.getCluster().isMaster()) {
                    LOG.warn("Received backup packet while master: {}", (Object)message);
                    break;
                }
                this.monitor.addBackupPacket();
                this.monitor.addBackups(((Message.BACKUP_PACKET)message).getBackups().size());
                this.handleReceivedBackupPacket((Message.BACKUP_PACKET)message);
                break;
            }
            case BACKUP: {
                if (this.getCluster().isMaster()) {
                    LOG.warn("Received backup while master: {}", (Object)message);
                    break;
                }
                this.monitor.addReplicationBackup(1);
                this.handleReceivedBackup((Message.BACKUP)message);
                break;
            }
            case INV: {
                if (this.getCluster().isMaster()) {
                    LOG.warn("Received INV while master: {}", (Object)message);
                    break;
                }
                this.handleReceivedInvalidate((Message.INV)message);
                break;
            }
        }
    }

    private void handleReceivedBackupPacket(Message.BACKUP_PACKET packet) {
        try {
            LOG.debug("Received backup packet: {}", (Object)packet);
            for (Message.BACKUP backup : packet.getBackups()) {
                this.cache.receive(backup);
            }
            this.slaveComm.send(Message.BACKUP_PACKETACK(packet));
        }
        catch (NodeNotFoundException e) {
            LOG.error("Exception while sending backup ack", (Throwable)e);
        }
    }

    private void handleReceivedBackup(Message.BACKUP backup) {
        LOG.debug("Received replication backup: {}", (Object)backup);
        if (backup.getLine() < 0L) {
            LOG.info("Slave node now ready! (completed replication)");
            this.completedReplication = true;
            this.setReady(true);
        } else {
            this.cache.receive(backup);
        }
    }

    private void handleReceivedInvalidate(Message.INV inv) {
        try {
            LOG.debug("Received inv: {}", (Object)inv);
            this.cache.receive(inv);
            this.slaveComm.send(Message.INVACK(inv));
        }
        catch (NodeNotFoundException e) {
            throw new AssertionError((Object)e);
        }
    }

    private static class BackupEntry {
        public final long id;
        public final long version;

        public BackupEntry(long id, long version) {
            this.id = id;
            this.version = version;
        }

        public String toString() {
            return "BackupEntry{id: " + Long.toHexString(this.id) + ", version: " + this.version + '}';
        }
    }
}

