/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.jcr.bus;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.modeshape.common.annotation.ThreadSafe;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.HashCode;
import org.modeshape.jcr.bus.ChangeBus;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.cache.change.ChangeSetListener;

@ThreadSafe
public final class RepositoryChangeBus
implements ChangeBus {
    protected static final Logger LOGGER = Logger.getLogger(RepositoryChangeBus.class);
    protected volatile boolean shutdown;
    private final ExecutorService executor;
    private final List<ChangeSetDispatcher> dispatchers;
    private final Map<Integer, Future<?>> workers;
    private final ReadWriteLock listenersLock;
    private final String systemWorkspaceName;

    public RepositoryChangeBus(ExecutorService executor, String systemWorkspaceName) {
        this.systemWorkspaceName = systemWorkspaceName;
        this.workers = new HashMap();
        this.dispatchers = new ArrayList<ChangeSetDispatcher>();
        this.listenersLock = new ReentrantReadWriteLock(true);
        this.executor = executor;
        this.shutdown = false;
    }

    @Override
    public synchronized void start() {
    }

    @Override
    public synchronized void shutdown() {
        this.shutdown = true;
        this.dispatchers.clear();
        this.stopWork();
    }

    private void stopWork() {
        this.executor.shutdown();
        for (Future<?> worker : this.workers.values()) {
            if (worker.isDone()) continue;
            worker.cancel(true);
        }
        this.workers.clear();
    }

    @Override
    public boolean register(ChangeSetListener listener) {
        return this.internalRegister(listener, false);
    }

    @Override
    public boolean registerInThread(ChangeSetListener listener) {
        return this.internalRegister(listener, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean internalRegister(ChangeSetListener listener, boolean inThread) {
        if (listener == null) {
            return false;
        }
        int hashCode = HashCode.compute((Object[])new Object[]{listener});
        if (this.workers.containsKey(hashCode)) {
            return false;
        }
        try {
            this.listenersLock.writeLock().lock();
            if (!this.workers.containsKey(hashCode)) {
                ChangeSetDispatcher dispatcher = new ChangeSetDispatcher(listener, inThread);
                this.dispatchers.add(dispatcher);
                this.workers.put(hashCode, this.executor.submit(dispatcher));
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.listenersLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean unregister(ChangeSetListener listener) {
        if (listener == null) {
            return false;
        }
        int hashCode = HashCode.compute((Object[])new Object[]{listener});
        if (!this.workers.containsKey(hashCode)) {
            return false;
        }
        try {
            this.listenersLock.writeLock().lock();
            if (!this.workers.containsKey(hashCode)) {
                boolean bl = false;
                return bl;
            }
            Iterator<ChangeSetDispatcher> dispatcherIterator = this.dispatchers.iterator();
            while (dispatcherIterator.hasNext()) {
                ChangeSetDispatcher dispatcher = dispatcherIterator.next();
                if (dispatcher.listenerHashCode() != hashCode) continue;
                Future<?> work = this.workers.remove(hashCode);
                work.cancel(true);
                dispatcherIterator.remove();
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.listenersLock.writeLock().unlock();
        }
        return false;
    }

    @Override
    public void notify(ChangeSet changeSet) {
        if (changeSet == null || !this.hasObservers()) {
            return;
        }
        if (this.shutdown) {
            throw new IllegalStateException("Change bus has been already shut down, should not have any more observers");
        }
        boolean inThread = this.systemWorkspaceName.equalsIgnoreCase(changeSet.getWorkspaceName());
        this.submitChanges(changeSet, inThread);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean submitChanges(ChangeSet changeSet, boolean inThread) {
        try {
            this.listenersLock.readLock().lock();
            for (ChangeSetDispatcher dispatcher : this.dispatchers) {
                if (inThread || dispatcher.notifyInSameThread()) {
                    dispatcher.listener().notify(changeSet);
                    continue;
                }
                dispatcher.submit(changeSet);
            }
            boolean bl = true;
            return bl;
        }
        finally {
            this.listenersLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean hasObservers() {
        try {
            this.listenersLock.readLock().lock();
            boolean bl = !this.dispatchers.isEmpty();
            return bl;
        }
        finally {
            this.listenersLock.readLock().unlock();
        }
    }

    private class ChangeSetDispatcher
    implements Callable<Void> {
        private final int listenerHashCode;
        private final boolean notifyInSameThread;
        private ChangeSetListener listener;
        private BlockingQueue<ChangeSet> queue;

        protected ChangeSetDispatcher(ChangeSetListener listener, boolean notifyInSameThread) {
            this.listener = listener;
            this.listenerHashCode = HashCode.compute((Object[])new Object[]{listener});
            this.queue = new LinkedBlockingQueue<ChangeSet>();
            this.notifyInSameThread = notifyInSameThread;
        }

        @Override
        public Void call() {
            while (!RepositoryChangeBus.this.shutdown) {
                try {
                    ChangeSet changeSet = this.queue.take();
                    if (changeSet == null) continue;
                    this.listener.notify(changeSet);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    break;
                }
            }
            this.shutdown();
            return null;
        }

        protected void submit(ChangeSet changeSet) {
            if (!this.queue.offer(changeSet)) {
                LOGGER.debug("Cannot submit change set: {0} because the queue is full", new Object[]{changeSet});
            }
        }

        protected int listenerHashCode() {
            return this.listenerHashCode;
        }

        protected ChangeSetListener listener() {
            return this.listener;
        }

        protected boolean notifyInSameThread() {
            return this.notifyInSameThread;
        }

        private void shutdown() {
            while (!this.queue.isEmpty()) {
                this.listener.notify((ChangeSet)this.queue.remove());
            }
            this.listener = null;
            this.queue = null;
        }
    }
}

