/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.streams;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog2.indexer.indexset.events.IndexSetCreatedEvent;
import org.graylog2.indexer.indexset.events.IndexSetDeletedEvent;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamRouter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamRouter.class);
    protected final StreamService streamService;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final AtomicReference<StreamRouterEngine> routerEngine = new AtomicReference<Object>(null);
    private final StreamRouterEngineUpdater engineUpdater;

    @Inject
    public StreamRouter(StreamService streamService, ServerStatus serverStatus, StreamRouterEngine.Factory routerEngineFactory, EventBus serverEventBus, @Named(value="daemonScheduler") ScheduledExecutorService scheduler) {
        this.streamService = streamService;
        this.serverStatus = serverStatus;
        this.scheduler = scheduler;
        this.engineUpdater = new StreamRouterEngineUpdater(this.routerEngine, routerEngineFactory, streamService, this.executorService());
        this.routerEngine.set(this.engineUpdater.getNewEngine());
        serverEventBus.register((Object)this);
    }

    @Subscribe
    public void handleStreamsUpdate(StreamsChangedEvent event) {
        this.scheduler.submit(this.engineUpdater);
    }

    @Subscribe
    public void handleIndexSetCreation(IndexSetCreatedEvent event) {
        this.scheduler.submit(this.engineUpdater);
    }

    @Subscribe
    public void handleIndexSetDeletion(IndexSetDeletedEvent event) {
        this.scheduler.submit(this.engineUpdater);
    }

    private ExecutorService executorService() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("stream-router-%d").setDaemon(true).build();
        return Executors.newCachedThreadPool(threadFactory);
    }

    public List<Stream> route(Message msg) {
        StreamRouterEngine engine = this.routerEngine.get();
        msg.recordCounter(this.serverStatus, "streams-evaluated", engine.getStreams().size());
        return engine.match(msg);
    }

    private static class StreamRouterEngineUpdater
    implements Runnable {
        private final AtomicReference<StreamRouterEngine> routerEngine;
        private final StreamRouterEngine.Factory engineFactory;
        private final StreamService streamService;
        private final ExecutorService executorService;

        public StreamRouterEngineUpdater(AtomicReference<StreamRouterEngine> routerEngine, StreamRouterEngine.Factory engineFactory, StreamService streamService, ExecutorService executorService) {
            this.routerEngine = routerEngine;
            this.engineFactory = engineFactory;
            this.streamService = streamService;
            this.executorService = executorService;
        }

        @Override
        public void run() {
            try {
                StreamRouterEngine engine = this.getNewEngine();
                if (engine.getFingerprint().equals(this.routerEngine.get().getFingerprint())) {
                    LOG.debug("Not updating router engine, streams did not change (fingerprint={})", (Object)engine.getFingerprint());
                } else {
                    LOG.debug("Updating to new stream router engine. (old-fingerprint={} new-fingerprint={}", (Object)this.routerEngine.get().getFingerprint(), (Object)engine.getFingerprint());
                    this.routerEngine.set(engine);
                }
            }
            catch (Exception e) {
                LOG.error("Stream router engine update failed!", (Throwable)e);
            }
        }

        private StreamRouterEngine getNewEngine() {
            return this.engineFactory.create(this.streamService.loadAllEnabled(), this.executorService);
        }
    }
}

