/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.jcr.observation;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Monitor;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.api.stats.RepositoryStatistics;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.jcr.observation.ChangeProcessorMBean;
import org.apache.jackrabbit.oak.jcr.observation.EventQueue;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.Filter;
import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
import org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.observation.ChangeSet;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.stats.TimeSeriesMax;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangeProcessor
implements FilteringAwareObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
    private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger((String)(ChangeProcessor.class.getName() + ".perf")));
    public static final double DELAY_THRESHOLD;
    public static final int MAX_DELAY;
    static long QUEUE_FULL_WARN_INTERVAL;
    static Clock clock;
    private static final AtomicInteger COUNTER;
    static final String LISTENER_ID = "listenerId";
    private final ContentSession contentSession;
    private final NamePathMapper namePathMapper;
    private final ListenerTracker tracker;
    private final EventListener eventListener;
    private final AtomicReference<FilterProvider> filterProvider;
    private final MeterStats eventCount;
    private final TimerStats eventDuration;
    private final TimeSeriesMax maxQueueLengthRecorder;
    private final int queueLength;
    private final CommitRateLimiter commitRateLimiter;
    private final BlobAccessProvider blobAccessProvider;
    private String listenerId;
    private CompositeRegistration registration;
    private int prefilterExcludeCount;
    private int prefilterIncludeCount;
    private int prefilterSkipCount;
    private final Monitor runningMonitor = new Monitor();
    private final RunningGuard running = new RunningGuard(this.runningMonitor);

    public ChangeProcessor(ContentSession contentSession, NamePathMapper namePathMapper, ListenerTracker tracker, FilterProvider filter, StatisticManager statisticManager, int queueLength, CommitRateLimiter commitRateLimiter, BlobAccessProvider blobAccessProvider) {
        this.contentSession = contentSession;
        this.namePathMapper = namePathMapper;
        this.tracker = tracker;
        this.eventListener = tracker.getTrackedListener();
        this.filterProvider = new AtomicReference<FilterProvider>(filter);
        this.eventCount = statisticManager.getMeter(RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER);
        this.eventDuration = statisticManager.getTimer(RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION);
        this.maxQueueLengthRecorder = statisticManager.maxQueLengthRecorder();
        this.queueLength = queueLength;
        this.commitRateLimiter = commitRateLimiter;
        this.blobAccessProvider = blobAccessProvider;
    }

    public void setFilterProvider(FilterProvider filter) {
        this.filterProvider.set(filter);
    }

    FilterProvider getFilterProvider() {
        return this.filterProvider.get();
    }

    @NotNull
    public ChangeProcessorMBean getMBean() {
        return new ChangeProcessorMBean(){

            @Override
            public int getPrefilterExcludeCount() {
                return ChangeProcessor.this.prefilterExcludeCount;
            }

            @Override
            public int getPrefilterIncludeCount() {
                return ChangeProcessor.this.prefilterIncludeCount;
            }

            @Override
            public int getPrefilterSkipCount() {
                return ChangeProcessor.this.prefilterSkipCount;
            }
        };
    }

    public synchronized void start(Whiteboard whiteboard) {
        Preconditions.checkState((this.registration == null ? 1 : 0) != 0, (Object)"Change processor started already");
        final WhiteboardExecutor executor = new WhiteboardExecutor();
        executor.start(whiteboard);
        final FilteringObserver filteringObserver = this.createObserver(executor);
        this.listenerId = COUNTER.incrementAndGet() + "";
        ImmutableMap attrs = ImmutableMap.of((Object)LISTENER_ID, (Object)this.listenerId);
        String name = this.tracker.toString();
        this.registration = new CompositeRegistration(whiteboard.register(Observer.class, filteringObserver, Collections.emptyMap()), WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class, this.tracker.getListenerMBean(), "EventListener", name, (Map<String, String>)attrs), WhiteboardUtils.registerMBean(whiteboard, BackgroundObserverMBean.class, filteringObserver.getBackgroundObserver().getMBean(), "BackgroundObserverStats", name, (Map<String, String>)attrs), WhiteboardUtils.registerMBean(whiteboard, ChangeProcessorMBean.class, this.getMBean(), "ChangeProcessorStats", name, (Map<String, String>)attrs), WhiteboardUtils.registerMBean(whiteboard, FilterConfigMBean.class, this.filterProvider.get().getConfigMBean(), "FilterConfig", name, (Map<String, String>)attrs), new Registration(){

            @Override
            public void unregister() {
                filteringObserver.close();
            }
        }, new Registration(){

            @Override
            public void unregister() {
                executor.stop();
            }
        }, WhiteboardUtils.scheduleWithFixedDelay(whiteboard, new Runnable(){

            @Override
            public void run() {
                ChangeProcessor.this.tracker.recordOneSecond();
            }
        }, 1L));
    }

    private FilteringObserver createObserver(WhiteboardExecutor executor) {
        FilteringDispatcher fd = new FilteringDispatcher(this);
        BackgroundObserver bo = new BackgroundObserver(fd, executor, this.queueLength){
            private volatile long delay;
            private volatile boolean blocking;
            private long lastQueueFullWarnTimestamp;
            {
                this.lastQueueFullWarnTimestamp = -1L;
            }

            @Override
            protected void added(int newQueueSize) {
                this.queueSizeChanged(newQueueSize);
            }

            @Override
            protected void removed(int newQueueSize, long created) {
                this.queueSizeChanged(newQueueSize);
            }

            private void queueSizeChanged(int newQueueSize) {
                ChangeProcessor.this.maxQueueLengthRecorder.recordValue(newQueueSize);
                ChangeProcessor.this.tracker.recordQueueLength(newQueueSize);
                if (newQueueSize >= ChangeProcessor.this.queueLength) {
                    if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (!this.blocking) {
                            this.logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
                        }
                        ChangeProcessor.this.commitRateLimiter.blockCommits();
                    } else if (!this.blocking) {
                        this.logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
                    }
                    this.blocking = true;
                } else {
                    double fillRatio = (double)newQueueSize / (double)ChangeProcessor.this.queueLength;
                    if (fillRatio > DELAY_THRESHOLD) {
                        if (ChangeProcessor.this.commitRateLimiter != null) {
                            int newDelay;
                            if (this.delay == 0L) {
                                LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
                            }
                            if ((long)(newDelay = 1 + (int)((fillRatio - DELAY_THRESHOLD) / (1.0 - DELAY_THRESHOLD) * (double)MAX_DELAY)) > this.delay) {
                                this.delay = newDelay;
                                ChangeProcessor.this.commitRateLimiter.setDelay(this.delay);
                            }
                        }
                    } else if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (this.delay > 0L) {
                            LOG.debug("Revision queue becoming empty. Unblocking commits");
                            ChangeProcessor.this.commitRateLimiter.setDelay(0L);
                            this.delay = 0L;
                        }
                        if (this.blocking) {
                            LOG.debug("Revision queue becoming empty. Stop delaying commits.");
                            ChangeProcessor.this.commitRateLimiter.unblockCommits();
                            this.blocking = false;
                        }
                    } else {
                        this.blocking = false;
                    }
                }
            }

            private void logQueueFullWarning(String message) {
                long currTime = clock.getTime();
                if (this.lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
                    LOG.warn("{} Suppressing further such cases for {} minutes.", (Object)message, (Object)TimeUnit.MILLISECONDS.toMinutes(QUEUE_FULL_WARN_INTERVAL));
                    this.lastQueueFullWarnTimestamp = currTime;
                } else {
                    LOG.debug(message);
                }
            }

            public String toString() {
                return "Prefiltering BackgroundObserver for " + ChangeProcessor.this;
            }
        };
        return new FilteringObserver(bo, new Filter(){

            @Override
            public boolean excludes(NodeState root, CommitInfo info) {
                FilterResult filterResult = ChangeProcessor.this.evalPrefilter(root, info, ChangeProcessor.getChangeSet(info));
                switch (filterResult) {
                    case PREFILTERING_SKIPPED: {
                        ChangeProcessor.this.prefilterSkipCount++;
                        return false;
                    }
                    case EXCLUDE: {
                        ChangeProcessor.this.prefilterExcludeCount++;
                        return true;
                    }
                    case INCLUDE: {
                        ChangeProcessor.this.prefilterIncludeCount++;
                        return false;
                    }
                }
                LOG.info("isExcluded: unknown/unsupported filter result: " + (Object)((Object)filterResult));
                ChangeProcessor.this.prefilterSkipCount++;
                return false;
            }
        });
    }

    public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
        Preconditions.checkState((this.registration != null ? 1 : 0) != 0, (Object)"Change processor not started");
        if (this.running.stop()) {
            if (this.runningMonitor.enter((long)timeOut, unit)) {
                this.registration.unregister();
                this.runningMonitor.leave();
                return true;
            }
            return false;
        }
        return true;
    }

    public synchronized void stop() {
        Preconditions.checkState((this.registration != null ? 1 : 0) != 0, (Object)"Change processor not started");
        if (this.running.stop()) {
            this.registration.unregister();
            this.runningMonitor.leave();
        }
    }

    public static ChangeSet getChangeSet(CommitInfo info) {
        if (info == null) {
            return null;
        }
        CommitContext context = (CommitContext)info.getInfo().get("oak.commitAttributes");
        if (context == null) {
            return null;
        }
        return (ChangeSet)context.get("oak.observation.changeSet");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void contentChanged(@NotNull NodeState before, @NotNull NodeState after, @NotNull CommitInfo info) {
        Preconditions.checkNotNull((Object)before);
        Preconditions.checkNotNull((Object)after);
        Preconditions.checkNotNull((Object)info);
        try {
            long start = PERF_LOGGER.start();
            FilterProvider provider = this.filterProvider.get();
            if (provider.includeCommit(this.contentSession.toString(), info)) {
                EventFilter filter = provider.getFilter(before, after);
                EventQueue events = new EventQueue(this.namePathMapper, this.blobAccessProvider, info, before, after, provider.getSubTrees(), Filters.all(filter, VisibleFilter.VISIBLE_FILTER), provider.getEventAggregator());
                long time = System.nanoTime();
                boolean hasEvents = events.hasNext();
                this.tracker.recordProducerTime(System.nanoTime() - time, TimeUnit.NANOSECONDS);
                if (hasEvents && this.runningMonitor.enterIf((Monitor.Guard)this.running)) {
                    if (this.commitRateLimiter != null) {
                        this.commitRateLimiter.beforeNonBlocking();
                    }
                    try {
                        CountingIterator countingEvents = new CountingIterator(events);
                        this.eventListener.onEvent((EventIterator)countingEvents);
                        countingEvents.updateCounters(this.eventCount, this.eventDuration);
                    }
                    finally {
                        if (this.commitRateLimiter != null) {
                            this.commitRateLimiter.afterNonBlocking();
                        }
                        this.runningMonitor.leave();
                    }
                }
            }
            PERF_LOGGER.end(start, 100L, "Generated events (before: {}, after: {})", (Object)before, (Object)after);
        }
        catch (Exception e) {
            LOG.warn("Error while dispatching observation events for " + this.tracker, (Throwable)e);
        }
    }

    public String toString() {
        return "ChangeProcessor [listenerId=" + this.listenerId + ", tracker=" + this.tracker + ", contentSession=" + this.contentSession + ", eventCount=" + this.eventCount + ", eventDuration=" + this.eventDuration + ", commitRateLimiter=" + this.commitRateLimiter + ", running=" + this.running.isSatisfied() + "]";
    }

    String getListenerToString() {
        if (this.tracker == null) {
            return "null";
        }
        EventListenerMBean listenerMBean = this.tracker.getListenerMBean();
        if (listenerMBean == null) {
            return "null (no listener mbean)";
        }
        return listenerMBean.getToString();
    }

    private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) {
        if (info == null) {
            return FilterResult.PREFILTERING_SKIPPED;
        }
        if (root == null) {
            return FilterResult.PREFILTERING_SKIPPED;
        }
        FilterProvider fp = this.filterProvider.get();
        if (!fp.includeCommit(this.contentSession.toString(), info)) {
            return FilterResult.EXCLUDE;
        }
        if (changeSet == null) {
            return FilterResult.PREFILTERING_SKIPPED;
        }
        FilterProvider prefilter = fp;
        if (prefilter.excludes(changeSet)) {
            return FilterResult.EXCLUDE;
        }
        return FilterResult.INCLUDE;
    }

    static {
        QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer.getInteger("oak.observation.full-queue.warn.interval", 30).intValue());
        clock = Clock.SIMPLE;
        String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
        String maxDelayStr = System.getProperty("oak.commitRateLimiter.maxDelay");
        double delayThreshold = 0.8;
        int maxDelay = 10000;
        try {
            if (delayThresholdStr != null && delayThresholdStr.length() != 0) {
                delayThreshold = Double.parseDouble(delayThresholdStr);
                LOG.info("<clinit> using oak.commitRateLimiter.delayThreshold of " + delayThreshold);
            }
        }
        catch (RuntimeException e) {
            LOG.warn("<clinit> could not parse oak.commitRateLimiter.delayThreshold, using default(" + delayThreshold + "): " + e, (Throwable)e);
        }
        try {
            if (maxDelayStr != null && maxDelayStr.length() != 0) {
                maxDelay = Integer.parseInt(maxDelayStr);
                LOG.info("<clinit> using oak.commitRateLimiter.maxDelay of " + maxDelay + "ms");
            }
        }
        catch (RuntimeException e) {
            LOG.warn("<clinit> could not parse oak.commitRateLimiter.maxDelay, using default(" + maxDelay + "): " + e, (Throwable)e);
        }
        DELAY_THRESHOLD = delayThreshold;
        MAX_DELAY = maxDelay;
        COUNTER = new AtomicInteger();
    }

    private static class RunningGuard
    extends Monitor.Guard {
        private boolean stopped;

        public RunningGuard(Monitor monitor) {
            super(monitor);
        }

        public boolean isSatisfied() {
            return !this.stopped;
        }

        public boolean stop() {
            boolean wasStopped = this.stopped;
            this.stopped = true;
            return !wasStopped;
        }
    }

    private static class CountingIterator
    implements EventIterator {
        private final long t0 = System.nanoTime();
        private final EventIterator events;
        private long eventCount;
        private long sysTime;

        public CountingIterator(EventIterator events) {
            this.events = events;
        }

        public void updateCounters(MeterStats eventCount, TimerStats eventDuration) {
            Preconditions.checkState((this.eventCount >= 0L ? 1 : 0) != 0);
            eventCount.mark(this.eventCount);
            eventDuration.update(System.nanoTime() - this.t0 - this.sysTime, TimeUnit.NANOSECONDS);
            this.eventCount = -1L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Event next() {
            if (this.eventCount == -1L) {
                LOG.warn("Access to EventIterator outside the onEvent callback detected. This will cause observation related values in RepositoryStatistics to become unreliable.");
                this.eventCount = -2L;
            }
            long t0 = System.nanoTime();
            try {
                Event event = this.events.nextEvent();
                return event;
            }
            finally {
                ++this.eventCount;
                this.sysTime += System.nanoTime() - t0;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean hasNext() {
            long t0 = System.nanoTime();
            try {
                boolean bl = this.events.hasNext();
                return bl;
            }
            finally {
                this.sysTime += System.nanoTime() - t0;
            }
        }

        public Event nextEvent() {
            return this.next();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void skip(long skipNum) {
            long t0 = System.nanoTime();
            try {
                this.events.skip(skipNum);
            }
            finally {
                this.sysTime += System.nanoTime() - t0;
            }
        }

        public long getSize() {
            return this.events.getSize();
        }

        public long getPosition() {
            return this.events.getPosition();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static enum FilterResult {
        INCLUDE,
        EXCLUDE,
        PREFILTERING_SKIPPED;

    }
}

