/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.spi.commit;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.commons.collections.IterableUtils;
import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
import org.apache.jackrabbit.oak.commons.conditions.Validate;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackgroundObserver
implements Observer,
Closeable {
    public static final int DEFAULT_QUEUE_SIZE = 10000;
    private static final ContentChange STOP = new ContentChange(null, null);
    private final Observer observer;
    private final Executor executor;
    private final Thread.UncaughtExceptionHandler exceptionHandler;
    private final BlockingQueue<ContentChange> queue;
    private final int maxQueueLength;
    private final boolean alwaysCollapseExternalEvents = Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
    private ContentChange last;
    private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
    private final Runnable completionHandler = new Runnable(){
        Callable<Void> task = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    ContentChange change = (ContentChange)BackgroundObserver.this.queue.poll();
                    if (change != null && change != STOP) {
                        BackgroundObserver.this.observer.contentChanged(change.root, change.info);
                        BackgroundObserver.this.removed(BackgroundObserver.this.queue.size(), change.created);
                        BackgroundObserver.this.currentTask.onComplete(BackgroundObserver.this.completionHandler);
                    }
                }
                catch (Throwable t) {
                    BackgroundObserver.this.exceptionHandler.uncaughtException(Thread.currentThread(), t);
                }
                return null;
            }
        };

        @Override
        public void run() {
            BackgroundObserver.this.currentTask = new NotifyingFutureTask(this.task);
            BackgroundObserver.this.executor.execute((Runnable)BackgroundObserver.this.currentTask);
        }
    };
    private volatile boolean stopped;

    public BackgroundObserver(@NotNull Observer observer, @NotNull Executor executor, int queueLength, @NotNull Thread.UncaughtExceptionHandler exceptionHandler) {
        this.observer = Objects.requireNonNull(observer);
        this.executor = Objects.requireNonNull(executor);
        this.exceptionHandler = Objects.requireNonNull(exceptionHandler);
        this.maxQueueLength = queueLength;
        this.queue = new ArrayBlockingQueue<ContentChange>(this.maxQueueLength);
    }

    public BackgroundObserver(final @NotNull Observer observer, @NotNull Executor executor, int queueLength) {
        this(observer, executor, queueLength, new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                BackgroundObserver.getLogger(observer).error("Uncaught exception in " + String.valueOf(observer), e);
            }
        });
    }

    public BackgroundObserver(@NotNull Observer observer, @NotNull Executor executor) {
        this(observer, executor, 10000);
    }

    protected void added(int queueSize) {
    }

    protected void removed(int queueSize, long created) {
    }

    public int getMaxQueueLength() {
        return this.maxQueueLength;
    }

    @Override
    public synchronized void close() {
        this.queue.clear();
        this.queue.add(STOP);
        this.stopped = true;
    }

    @NotNull
    public BackgroundObserverMBean getMBean() {
        return new BackgroundObserverMBean(){

            @Override
            public String getClassName() {
                return BackgroundObserver.this.observer.getClass().getName();
            }

            @Override
            public int getQueueSize() {
                return BackgroundObserver.this.queue.size();
            }

            @Override
            public int getMaxQueueSize() {
                return BackgroundObserver.this.getMaxQueueLength();
            }

            @Override
            public int getLocalEventCount() {
                return IterableUtils.size((Iterable)IterableUtils.filter(BackgroundObserver.this.queue, input -> !input.info.isExternal()));
            }

            @Override
            public int getExternalEventCount() {
                return IterableUtils.size((Iterable)IterableUtils.filter(BackgroundObserver.this.queue, input -> input.info.isExternal()));
            }
        };
    }

    @Override
    public synchronized void contentChanged(@NotNull NodeState root, @NotNull CommitInfo info) {
        ContentChange change;
        boolean full;
        Validate.checkState((!this.stopped ? 1 : 0) != 0);
        Objects.requireNonNull(root);
        Objects.requireNonNull(info);
        if (this.alwaysCollapseExternalEvents && info.isExternal() && this.last != null && this.last.info.isExternal()) {
            this.queue.remove(this.last);
        }
        boolean bl = full = !this.queue.offer(change = new ContentChange(root, info));
        if (full && this.last != null) {
            this.queue.remove(this.last);
            change = new ContentChange(root, CommitInfo.EMPTY_EXTERNAL);
            this.queue.offer(change);
        }
        this.last = change;
        this.currentTask.onComplete(this.completionHandler);
        this.added(this.queue.size());
    }

    private static Logger getLogger(@NotNull Observer observer) {
        return LoggerFactory.getLogger(Objects.requireNonNull(observer).getClass());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException {
        long done = System.currentTimeMillis() + unit.toMillis(timeout);
        boolean added = false;
        while (done > System.currentTimeMillis()) {
            BackgroundObserver backgroundObserver = this;
            synchronized (backgroundObserver) {
                if (!added && (added = this.queue.offer(STOP))) {
                    this.currentTask.onComplete(this.completionHandler);
                }
                if (added && this.queue.size() == 0) {
                    return true;
                }
                this.wait(1L);
            }
        }
        return false;
    }

    private static class ContentChange {
        private final NodeState root;
        private final CommitInfo info;
        private final long created = System.currentTimeMillis();

        ContentChange(NodeState root, CommitInfo info) {
            this.root = root;
            this.info = info;
        }
    }
}

