/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.outputs.filter;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.SetMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
import org.graylog2.indexer.messages.ImmutableMessage;
import org.graylog2.outputs.filter.DefaultFilteredMessage;
import org.graylog2.outputs.filter.FilteredMessage;
import org.graylog2.outputs.filter.OutputFilter;
import org.graylog2.outputs.filter.PipelineRuleOutputFilterState;
import org.graylog2.outputs.filter.PipelineRuleOutputFilterStateUpdater;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.messageq.noop.NoopMessageQueueAcknowledger;
import org.graylog2.shared.utilities.StringUtils;
import org.graylog2.streams.filters.StreamDestinationFilterDeletedEvent;
import org.graylog2.streams.filters.StreamDestinationFilterUpdatedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PipelineRuleOutputFilter
implements OutputFilter {
    private static final Logger LOG = LoggerFactory.getLogger(PipelineRuleOutputFilter.class);
    private static final NoopInterpreterListener NOOP_INTERPRETER_LISTENER = new NoopInterpreterListener();
    public static final String METADATA_KEY = PipelineRuleOutputFilter.class.getCanonicalName();
    private final PipelineRuleOutputFilterStateUpdater stateUpdater;
    private final ScheduledExecutorService scheduler;
    private final AtomicReference<PipelineRuleOutputFilterState> activeState = new AtomicReference();
    private final PipelineInterpreter pipelineInterpreter;
    private final Timer executionTime;

    @Inject
    public PipelineRuleOutputFilter(PipelineRuleOutputFilterStateUpdater stateUpdater, @Named(value="daemonScheduler") ScheduledExecutorService scheduler, MetricRegistry metricRegistry, EventBus eventBus) {
        this.stateUpdater = stateUpdater;
        this.scheduler = scheduler;
        this.pipelineInterpreter = new PipelineInterpreter(new NoopMessageQueueAcknowledger(), new MetricRegistry(), null);
        this.executionTime = metricRegistry.timer(MetricRegistry.name(this.getClass(), (String[])new String[]{"executionTime"}));
        eventBus.register((Object)this);
        stateUpdater.init(this.activeState);
    }

    @Subscribe
    private void handleFilterUpdate(StreamDestinationFilterUpdatedEvent event) {
        LOG.debug("Handling filter update: {}", (Object)event);
        this.scheduler.execute(() -> this.stateUpdater.reloadForUpdate(this.activeState, event.ids()));
    }

    @Subscribe
    private void handleFilterDelete(StreamDestinationFilterDeletedEvent event) {
        LOG.debug("Handling filter deletion: {}", (Object)event);
        this.scheduler.execute(() -> this.stateUpdater.reloadForDelete(this.activeState, event.ids()));
    }

    @Override
    public FilteredMessage apply(Message msg) {
        try (Timer.Context ignored = this.executionTime.time();){
            DefaultFilteredMessage defaultFilteredMessage = this.doApply(msg);
            return defaultFilteredMessage;
        }
    }

    private DefaultFilteredMessage doApply(Message msg) {
        PipelineRuleOutputFilterState state = this.activeState.get();
        if (state == null) {
            throw new IllegalStateException("Active state has not been initialized");
        }
        Metadata newMetadata = Metadata.forDestinationsAndStreams(state.getDestinations(), msg.getStreams());
        if (state.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("No active filter - returning early");
            }
            return new DefaultFilteredMessage(ImmutableMessage.wrap(msg), newMetadata.destinations());
        }
        msg.setMetadata(METADATA_KEY, newMetadata);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Message metadata before running filter: {} (message-id={})", (Object)newMetadata, (Object)msg.getId());
        }
        Set<Pipeline> pipelinesToRun = state.getPipelinesForMessage(msg);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Running {} pipelines for message <{}>: {}", new Object[]{pipelinesToRun.size(), msg.getId(), pipelinesToRun});
        }
        this.pipelineInterpreter.processForResolvedPipelines(msg, msg.getId(), pipelinesToRun, NOOP_INTERPRETER_LISTENER, this.activeState.get());
        Object object = msg.getMetadataValue(METADATA_KEY);
        if (object instanceof Metadata) {
            Metadata metadata = (Metadata)object;
            msg.removeMetadata(METADATA_KEY);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Message metadata after running filter: {} (message-id={})", (Object)metadata, (Object)msg.getId());
            }
            return new DefaultFilteredMessage(ImmutableMessage.wrap(msg), metadata.destinations());
        }
        throw new IllegalStateException(StringUtils.f("No metadata found for message <%s> - this should not happen!", msg.getId()));
    }

    public record Metadata(Multimap<String, Stream> destinations) {
        public static Metadata forDestinationsAndStreams(Set<String> destinations, Set<Stream> streams) {
            SetMultimap destinationsBuilder = MultimapBuilder.hashKeys().hashSetValues().build();
            destinations.forEach(arg_0 -> Metadata.lambda$forDestinationsAndStreams$0((Multimap)destinationsBuilder, streams, arg_0));
            return new Metadata((Multimap<String, Stream>)destinationsBuilder);
        }

        private static /* synthetic */ void lambda$forDestinationsAndStreams$0(Multimap destinationsBuilder, Set streams, String destination) {
            destinationsBuilder.putAll((Object)destination, (Iterable)streams);
        }
    }
}

