/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.jcr.observation;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Monitor;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.api.stats.RepositoryStatistics;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.jcr.observation.EventQueue;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
import org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.stats.TimeSeriesMax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangeProcessor
implements Observer {
    private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
    public static final double DELAY_THRESHOLD = 0.8;
    public static final int MAX_DELAY = 10000;
    private static final AtomicInteger COUNTER = new AtomicInteger();
    static final String LISTENER_ID = "listenerId";
    private final ContentSession contentSession;
    private final NamePathMapper namePathMapper;
    private final ListenerTracker tracker;
    private final EventListener eventListener;
    private final AtomicReference<FilterProvider> filterProvider;
    private final AtomicLong eventCount;
    private final AtomicLong eventDuration;
    private final TimeSeriesMax maxQueueLength;
    private final int queueLength;
    private final CommitRateLimiter commitRateLimiter;
    private CompositeRegistration registration;
    private volatile NodeState previousRoot;
    private final Monitor runningMonitor = new Monitor();
    private final RunningGuard running = new RunningGuard(this.runningMonitor);

    public ChangeProcessor(ContentSession contentSession, NamePathMapper namePathMapper, ListenerTracker tracker, FilterProvider filter, StatisticManager statisticManager, int queueLength, CommitRateLimiter commitRateLimiter) {
        this.contentSession = contentSession;
        this.namePathMapper = namePathMapper;
        this.tracker = tracker;
        this.eventListener = tracker.getTrackedListener();
        this.filterProvider = new AtomicReference<FilterProvider>(filter);
        this.eventCount = statisticManager.getCounter(RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER);
        this.eventDuration = statisticManager.getCounter(RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION);
        this.maxQueueLength = statisticManager.maxQueLengthRecorder();
        this.queueLength = queueLength;
        this.commitRateLimiter = commitRateLimiter;
    }

    public void setFilterProvider(FilterProvider filter) {
        this.filterProvider.set(filter);
    }

    public synchronized void start(Whiteboard whiteboard) {
        Preconditions.checkState((this.registration == null ? 1 : 0) != 0, (Object)"Change processor started already");
        final WhiteboardExecutor executor = new WhiteboardExecutor();
        executor.start(whiteboard);
        final BackgroundObserver observer = this.createObserver(executor);
        ImmutableMap attrs = ImmutableMap.of((Object)LISTENER_ID, (Object)String.valueOf(COUNTER.incrementAndGet()));
        String name = this.tracker.toString();
        this.registration = new CompositeRegistration(WhiteboardUtils.registerObserver(whiteboard, observer), WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class, this.tracker.getListenerMBean(), "EventListener", name, (Map<String, String>)attrs), WhiteboardUtils.registerMBean(whiteboard, BackgroundObserverMBean.class, observer.getMBean(), "BackgroundObserverStats", name, (Map<String, String>)attrs), WhiteboardUtils.registerMBean(whiteboard, FilterConfigMBean.class, this.filterProvider.get().getConfigMBean(), "FilterConfig", name, (Map<String, String>)attrs), new Registration(){

            @Override
            public void unregister() {
                observer.close();
            }
        }, new Registration(){

            @Override
            public void unregister() {
                executor.stop();
            }
        }, WhiteboardUtils.scheduleWithFixedDelay(whiteboard, new Runnable(){

            @Override
            public void run() {
                ChangeProcessor.this.tracker.recordOneSecond();
            }
        }, 1L));
    }

    private BackgroundObserver createObserver(WhiteboardExecutor executor) {
        return new BackgroundObserver(this, executor, this.queueLength){
            private volatile long delay;
            private volatile boolean blocking;

            @Override
            protected void added(int queueSize) {
                ChangeProcessor.this.maxQueueLength.recordValue(queueSize);
                ChangeProcessor.this.tracker.recordQueueLength(queueSize);
                if (queueSize == ChangeProcessor.this.queueLength) {
                    if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (!this.blocking) {
                            LOG.warn("Revision queue is full. Further commits will be blocked.");
                        }
                        ChangeProcessor.this.commitRateLimiter.blockCommits();
                    } else if (!this.blocking) {
                        LOG.warn("Revision queue is full. Further revisions will be compacted.");
                    }
                    this.blocking = true;
                } else {
                    double fillRatio = (double)queueSize / (double)ChangeProcessor.this.queueLength;
                    if (fillRatio > 0.8) {
                        if (ChangeProcessor.this.commitRateLimiter != null) {
                            int newDelay;
                            if (this.delay == 0L) {
                                LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
                            }
                            if ((long)(newDelay = 1 + (int)((fillRatio - 0.8) / 0.19999999999999996 * 10000.0)) > this.delay) {
                                this.delay = newDelay;
                                ChangeProcessor.this.commitRateLimiter.setDelay(this.delay);
                            }
                        }
                    } else if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (this.delay > 0L) {
                            LOG.debug("Revision queue becoming empty. Unblocking commits");
                            ChangeProcessor.this.commitRateLimiter.setDelay(0L);
                            this.delay = 0L;
                        }
                        if (this.blocking) {
                            LOG.debug("Revision queue becoming empty. Stop delaying commits.");
                            ChangeProcessor.this.commitRateLimiter.unblockCommits();
                            this.blocking = false;
                        }
                    }
                }
            }
        };
    }

    public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
        Preconditions.checkState((this.registration != null ? 1 : 0) != 0, (Object)"Change processor not started");
        if (this.running.stop()) {
            if (this.runningMonitor.enter((long)timeOut, unit)) {
                this.registration.unregister();
                this.runningMonitor.leave();
                return true;
            }
            return false;
        }
        return true;
    }

    public synchronized void stop() {
        Preconditions.checkState((this.registration != null ? 1 : 0) != 0, (Object)"Change processor not started");
        if (this.running.stop()) {
            this.registration.unregister();
            this.runningMonitor.leave();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
        block6: {
            if (this.previousRoot != null) {
                try {
                    FilterProvider provider = this.filterProvider.get();
                    if (!provider.includeCommit(this.contentSession.toString(), info)) break block6;
                    EventFilter filter = provider.getFilter(this.previousRoot, root);
                    EventQueue events = new EventQueue(this.namePathMapper, info, this.previousRoot, root, provider.getSubTrees(), Filters.all(filter, VisibleFilter.VISIBLE_FILTER));
                    if (!events.hasNext() || !this.runningMonitor.enterIf((Monitor.Guard)this.running)) break block6;
                    try {
                        CountingIterator countingEvents = new CountingIterator(events);
                        this.eventListener.onEvent((EventIterator)countingEvents);
                        countingEvents.updateCounters(this.eventCount, this.eventDuration);
                    }
                    finally {
                        this.runningMonitor.leave();
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while dispatching observation events for " + this.tracker, (Throwable)e);
                }
            }
        }
        this.previousRoot = root;
    }

    private static class RunningGuard
    extends Monitor.Guard {
        private boolean stopped;

        public RunningGuard(Monitor monitor) {
            super(monitor);
        }

        public boolean isSatisfied() {
            return !this.stopped;
        }

        public boolean stop() {
            boolean wasStopped = this.stopped;
            this.stopped = true;
            return !wasStopped;
        }
    }

    private static class CountingIterator
    implements EventIterator {
        private final long t0 = System.nanoTime();
        private final EventIterator events;
        private long eventCount;
        private long sysTime;

        public CountingIterator(EventIterator events) {
            this.events = events;
        }

        public void updateCounters(AtomicLong eventCount, AtomicLong eventDuration) {
            Preconditions.checkState((this.eventCount >= 0L ? 1 : 0) != 0);
            eventCount.addAndGet(this.eventCount);
            eventDuration.addAndGet(System.nanoTime() - this.t0 - this.sysTime);
            this.eventCount = -1L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Event next() {
            if (this.eventCount == -1L) {
                LOG.warn("Access to EventIterator outside the onEvent callback detected. This will cause observation related values in RepositoryStatistics to become unreliable.");
                this.eventCount = -2L;
            }
            long t0 = System.nanoTime();
            try {
                Event event = this.events.nextEvent();
                return event;
            }
            finally {
                ++this.eventCount;
                this.sysTime += System.nanoTime() - t0;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean hasNext() {
            long t0 = System.nanoTime();
            try {
                boolean bl = this.events.hasNext();
                return bl;
            }
            finally {
                this.sysTime += System.nanoTime() - t0;
            }
        }

        public Event nextEvent() {
            return this.next();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void skip(long skipNum) {
            long t0 = System.nanoTime();
            try {
                this.events.skip(skipNum);
            }
            finally {
                this.sysTime += System.nanoTime() - t0;
            }
        }

        public long getSize() {
            return this.events.getSize();
        }

        public long getPosition() {
            return this.events.getPosition();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

