/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.plugins.pipelineprocessor.processors;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigDto;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.events.PipelineConnectionsChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.PipelinesChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.RuleMetricsConfigChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.RulesChangedEvent;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.processors.PipelineMetricRegistry;
import org.graylog.plugins.pipelineprocessor.processors.PipelineResolver;
import org.graylog.plugins.pipelineprocessor.processors.PipelineResolverConfig;

@Singleton
public class ConfigurationStateUpdater {
    private static final RateLimitedLog log = PipelineInterpreter.getRateLimitedLog(ConfigurationStateUpdater.class);
    private final RuleMetricsConfigService ruleMetricsConfigService;
    private final ScheduledExecutorService scheduler;
    private final EventBus serverEventBus;
    private final PipelineInterpreter.State.Factory stateFactory;
    private final AtomicReference<PipelineInterpreter.State> latestState = new AtomicReference();
    private final PipelineResolver pipelineResolver;
    private final PipelineMetricRegistry pipelineMetricRegistry;

    @Inject
    public ConfigurationStateUpdater(RuleService ruleService, PipelineService pipelineService, PipelineStreamConnectionsService pipelineStreamConnectionsService, PipelineRuleParser pipelineRuleParser, PipelineResolver.Factory pipelineResolverFactory, RuleMetricsConfigService ruleMetricsConfigService, MetricRegistry metricRegistry, @Named(value="daemonScheduler") ScheduledExecutorService scheduler, EventBus serverEventBus, PipelineInterpreter.State.Factory stateFactory) {
        this.ruleMetricsConfigService = ruleMetricsConfigService;
        this.scheduler = scheduler;
        this.serverEventBus = serverEventBus;
        this.stateFactory = stateFactory;
        this.pipelineResolver = pipelineResolverFactory.create(PipelineResolverConfig.of(() -> ruleService.loadAll().stream(), () -> pipelineService.loadAll().stream(), () -> pipelineStreamConnectionsService.loadAll().stream()), pipelineRuleParser);
        this.pipelineMetricRegistry = PipelineMetricRegistry.create(metricRegistry, Pipeline.class.getName(), Rule.class.getName());
        serverEventBus.register((Object)this);
        this.reloadAndSave();
    }

    private synchronized PipelineInterpreter.State reloadAndSave() {
        ImmutableMap<String, Pipeline> currentPipelines = this.pipelineResolver.resolvePipelines(this.pipelineMetricRegistry);
        ImmutableSetMultimap<String, Pipeline> streamPipelineConnections = this.pipelineResolver.resolveStreamConnections((Map<String, Pipeline>)currentPipelines);
        RuleMetricsConfigDto ruleMetricsConfig = this.ruleMetricsConfigService.get();
        PipelineInterpreter.State newState = this.stateFactory.newState(currentPipelines, streamPipelineConnections, ruleMetricsConfig);
        this.latestState.set(newState);
        return newState;
    }

    public PipelineInterpreter.State getLatestState() {
        return this.latestState.get();
    }

    @Subscribe
    public void handleRuleChanges(RulesChangedEvent event) {
        event.deletedRuleIds().forEach(id -> {
            log.debug("Invalidated rule {}", id);
            this.pipelineMetricRegistry.removeRuleMetrics((String)id);
        });
        event.updatedRuleIds().forEach(id -> log.debug("Refreshing rule {}", id));
        this.scheduler.schedule(() -> this.serverEventBus.post((Object)this.reloadAndSave()), 0L, TimeUnit.SECONDS);
    }

    @Subscribe
    public void handlePipelineChanges(PipelinesChangedEvent event) {
        event.deletedPipelineIds().forEach(id -> {
            log.debug("Invalidated pipeline {}", id);
            this.pipelineMetricRegistry.removePipelineMetrics((String)id);
        });
        event.updatedPipelineIds().forEach(id -> log.debug("Refreshing pipeline {}", id));
        this.scheduler.schedule(() -> this.serverEventBus.post((Object)this.reloadAndSave()), 0L, TimeUnit.SECONDS);
    }

    @Subscribe
    public void handlePipelineConnectionChanges(PipelineConnectionsChangedEvent event) {
        log.debug("Pipeline stream connection changed: {}", (Object)event);
        this.scheduler.schedule(() -> this.serverEventBus.post((Object)this.reloadAndSave()), 0L, TimeUnit.SECONDS);
    }

    @Subscribe
    public void handlePipelineStateChange(PipelineInterpreter.State event) {
        log.debug("Pipeline interpreter state got updated");
    }

    @Subscribe
    public void handleRuleMetricsConfigChange(RuleMetricsConfigChangedEvent event) {
        log.debug("Rule metrics config changed: {}", (Object)event);
        this.scheduler.schedule(() -> this.serverEventBus.post((Object)this.reloadAndSave()), 0L, TimeUnit.SECONDS);
    }
}

