/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.spi.commit;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Queues;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackgroundObserver
implements Observer,
Closeable {
    private static final ContentChange STOP = new ContentChange(null, null);
    private final Observer observer;
    private final Executor executor;
    private final Thread.UncaughtExceptionHandler exceptionHandler;
    private final BlockingQueue<ContentChange> queue;
    private final int maxQueueLength;
    private ContentChange last;
    private boolean full;
    private volatile ListenableFutureTask currentTask = ListenableFutureTask.completed();
    private final Runnable completionHandler = new Runnable(){
        Callable<Void> task = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    ContentChange change = (ContentChange)BackgroundObserver.this.queue.poll();
                    if (change != null && change != STOP) {
                        BackgroundObserver.this.observer.contentChanged(change.root, change.info);
                        BackgroundObserver.this.currentTask.onComplete(BackgroundObserver.this.completionHandler);
                    }
                }
                catch (Throwable t) {
                    BackgroundObserver.this.exceptionHandler.uncaughtException(Thread.currentThread(), t);
                }
                return null;
            }
        };

        @Override
        public void run() {
            BackgroundObserver.this.currentTask = new ListenableFutureTask(this.task);
            BackgroundObserver.this.executor.execute(BackgroundObserver.this.currentTask);
        }
    };
    private volatile boolean stopped;

    public BackgroundObserver(@Nonnull Observer observer, @Nonnull Executor executor, int queueLength, @Nonnull Thread.UncaughtExceptionHandler exceptionHandler) {
        this.observer = (Observer)Preconditions.checkNotNull((Object)observer);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.exceptionHandler = (Thread.UncaughtExceptionHandler)Preconditions.checkNotNull((Object)exceptionHandler);
        this.maxQueueLength = queueLength;
        this.queue = Queues.newArrayBlockingQueue((int)this.maxQueueLength);
    }

    public BackgroundObserver(final @Nonnull Observer observer, @Nonnull Executor executor, int queueLength) {
        this(observer, executor, queueLength, new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                BackgroundObserver.getLogger(observer).error("Uncaught exception in " + observer, e);
            }
        });
    }

    public BackgroundObserver(@Nonnull Observer observer, @Nonnull Executor executor) {
        this(observer, executor, 1000);
    }

    protected void added(int queueSize) {
    }

    public int getMaxQueueLength() {
        return this.maxQueueLength;
    }

    @Override
    public synchronized void close() {
        this.queue.clear();
        this.queue.add(STOP);
        this.stopped = true;
    }

    @Nonnull
    public BackgroundObserverMBean getMBean() {
        return new BackgroundObserverMBean(){

            @Override
            public String getClassName() {
                return BackgroundObserver.this.observer.getClass().getName();
            }

            @Override
            public int getQueueSize() {
                return BackgroundObserver.this.queue.size();
            }

            @Override
            public int getMaxQueueSize() {
                return BackgroundObserver.this.getMaxQueueLength();
            }

            @Override
            public int getLocalEventCount() {
                return Iterables.size((Iterable)Iterables.filter((Iterable)BackgroundObserver.this.queue, (Predicate)new Predicate<ContentChange>(){

                    public boolean apply(@Nullable ContentChange input) {
                        return input.info != null;
                    }
                }));
            }

            @Override
            public int getExternalEventCount() {
                return Iterables.size((Iterable)Iterables.filter((Iterable)BackgroundObserver.this.queue, (Predicate)new Predicate<ContentChange>(){

                    public boolean apply(@Nullable ContentChange input) {
                        return input.info == null;
                    }
                }));
            }
        };
    }

    @Override
    public synchronized void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0);
        Preconditions.checkNotNull((Object)root);
        if (info == null && this.last != null && this.last.info == null) {
            this.queue.remove(this.last);
            this.full = false;
        }
        ContentChange change = this.full ? new ContentChange(root, null) : new ContentChange(root, info);
        boolean bl = this.full = !this.queue.offer(change);
        if (!this.full) {
            this.last = change;
        }
        this.currentTask.onComplete(this.completionHandler);
        this.added(this.queue.size());
    }

    private static Logger getLogger(@Nonnull Observer observer) {
        return LoggerFactory.getLogger(((Observer)Preconditions.checkNotNull((Object)observer)).getClass());
    }

    private static class ListenableFutureTask
    extends FutureTask<Void> {
        private final AtomicBoolean completed = new AtomicBoolean(false);
        private volatile Runnable onComplete;
        private static final Runnable NOP = new Runnable(){

            @Override
            public void run() {
            }
        };

        public ListenableFutureTask(Callable<Void> callable) {
            super(callable);
        }

        public ListenableFutureTask(Runnable task) {
            super(task, null);
        }

        public void onComplete(Runnable onComplete) {
            this.onComplete = onComplete;
            if (this.isDone()) {
                this.run(onComplete);
            }
        }

        @Override
        protected void done() {
            this.run(this.onComplete);
        }

        private void run(Runnable onComplete) {
            if (onComplete != null && this.completed.compareAndSet(false, true)) {
                onComplete.run();
            }
        }

        public static ListenableFutureTask completed() {
            ListenableFutureTask f = new ListenableFutureTask(NOP);
            f.run();
            return f;
        }
    }

    private static class ContentChange {
        private final NodeState root;
        private final CommitInfo info;

        ContentChange(NodeState root, CommitInfo info) {
            this.root = root;
            this.info = info;
        }
    }
}

