/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.streams;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.QueryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.bson.types.ObjectId;
import org.graylog.security.entities.EntityOwnershipService;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.PersistedServiceImpl;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.MongoIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.indexset.IndexSetService;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.database.users.User;
import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.resources.streams.requests.CreateStreamRequest;
import org.graylog2.streams.OutputService;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.events.StreamDeletedEvent;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.mongojack.DBProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamServiceImpl
extends PersistedServiceImpl
implements StreamService {
    private static final Logger LOG = LoggerFactory.getLogger(StreamServiceImpl.class);
    private final StreamRuleService streamRuleService;
    private final OutputService outputService;
    private final IndexSetService indexSetService;
    private final MongoIndexSet.Factory indexSetFactory;
    private final NotificationService notificationService;
    private final EntityOwnershipService entityOwnershipService;
    private final ClusterEventBus clusterEventBus;

    @Inject
    public StreamServiceImpl(MongoConnection mongoConnection, StreamRuleService streamRuleService, OutputService outputService, IndexSetService indexSetService, MongoIndexSet.Factory indexSetFactory, NotificationService notificationService, EntityOwnershipService entityOwnershipService, ClusterEventBus clusterEventBus) {
        super(mongoConnection);
        this.streamRuleService = streamRuleService;
        this.outputService = outputService;
        this.indexSetService = indexSetService;
        this.indexSetFactory = indexSetFactory;
        this.notificationService = notificationService;
        this.entityOwnershipService = entityOwnershipService;
        this.clusterEventBus = clusterEventBus;
    }

    @Nullable
    private IndexSet getIndexSet(DBObject dbObject) {
        return this.getIndexSet((String)dbObject.get("index_set_id"));
    }

    @Nullable
    private IndexSet getIndexSet(String id) {
        if (Strings.isNullOrEmpty((String)id)) {
            return null;
        }
        Optional<IndexSetConfig> indexSetConfig = this.indexSetService.get(id);
        return indexSetConfig.flatMap(c -> Optional.of(this.indexSetFactory.create((IndexSetConfig)c))).orElse(null);
    }

    public Stream load(ObjectId id) throws NotFoundException {
        DBObject o = this.get(StreamImpl.class, id);
        if (o == null) {
            throw new NotFoundException("Stream <" + id + "> not found!");
        }
        List<StreamRule> streamRules = this.streamRuleService.loadForStreamId(id.toHexString());
        Set<Output> outputs = this.loadOutputsForRawStream(o);
        Map fields = o.toMap();
        return new StreamImpl((ObjectId)o.get("_id"), fields, streamRules, outputs, this.getIndexSet(o));
    }

    @Override
    public Stream create(Map<String, Object> fields) {
        return new StreamImpl(fields, this.getIndexSet((String)fields.get("index_set_id")));
    }

    @Override
    public Stream create(CreateStreamRequest cr, String userId) {
        HashMap streamData = Maps.newHashMap();
        streamData.put("title", cr.title());
        streamData.put("description", cr.description());
        streamData.put("creator_user_id", userId);
        streamData.put("created_at", Tools.nowUTC());
        streamData.put("content_pack", cr.contentPack());
        streamData.put("matching_type", cr.matchingType().toString());
        streamData.put("disabled", false);
        streamData.put("remove_matches_from_default_stream", cr.removeMatchesFromDefaultStream());
        streamData.put("index_set_id", cr.indexSetId());
        return this.create(streamData);
    }

    @Override
    public Stream load(String id) throws NotFoundException {
        try {
            return this.load(new ObjectId(id));
        }
        catch (IllegalArgumentException e) {
            throw new NotFoundException("Stream <" + id + "> not found!");
        }
    }

    @Override
    public List<Stream> loadAllEnabled() {
        return this.loadAllEnabled(new HashMap<String, Object>());
    }

    public List<Stream> loadAllEnabled(Map<String, Object> additionalQueryOpts) {
        additionalQueryOpts.put("disabled", false);
        return this.loadAll(additionalQueryOpts);
    }

    @Override
    public List<Stream> loadAll() {
        return this.loadAll(Collections.emptyMap());
    }

    public List<Stream> loadAll(Map<String, Object> additionalQueryOpts) {
        BasicDBObject query = new BasicDBObject(additionalQueryOpts);
        return this.loadAll((DBObject)query);
    }

    private List<Stream> loadAll(DBObject query) {
        List<DBObject> results = this.query(StreamImpl.class, query);
        List<String> streamIds = results.stream().map(o -> o.get("_id").toString()).collect(Collectors.toList());
        Map<String, List<StreamRule>> allStreamRules = this.streamRuleService.loadForStreamIds(streamIds);
        ImmutableList.Builder streams = ImmutableList.builder();
        Map<String, IndexSet> indexSets = this.indexSetsForStreams(results);
        Set<String> outputIds = results.stream().map(this::outputIdsForRawStream).flatMap(outputs -> outputs.stream().map(ObjectId::toHexString)).collect(Collectors.toSet());
        Map outputsById = this.outputService.loadByIds(outputIds).stream().collect(Collectors.toMap(Output::getId, Function.identity()));
        for (DBObject o2 : results) {
            ObjectId objectId = (ObjectId)o2.get("_id");
            String id = objectId.toHexString();
            List<StreamRule> streamRules = allStreamRules.getOrDefault(id, Collections.emptyList());
            LOG.debug("Found {} rules for stream <{}>", (Object)streamRules.size(), (Object)id);
            Set<Output> outputs2 = this.outputIdsForRawStream(o2).stream().map(ObjectId::toHexString).map(outputId -> {
                Output output = (Output)outputsById.get(outputId);
                if (output == null) {
                    String streamTitle = Strings.nullToEmpty((String)((String)o2.get("title")));
                    LOG.warn("Stream \"" + streamTitle + "\" <" + id + "> references missing output <" + outputId + "> - ignoring output.");
                }
                return output;
            }).filter(Objects::nonNull).collect(Collectors.toSet());
            Map fields = o2.toMap();
            String indexSetId = (String)fields.get("index_set_id");
            streams.add((Object)new StreamImpl(objectId, fields, streamRules, outputs2, indexSets.get(indexSetId)));
        }
        return streams.build();
    }

    private List<ObjectId> outputIdsForRawStream(DBObject o) {
        List objectIds = (List)o.get("outputs");
        return objectIds == null ? Collections.emptyList() : objectIds;
    }

    private Map<String, IndexSet> indexSetsForStreams(List<DBObject> streams) {
        Set<String> indexSetIds = streams.stream().map(stream -> (String)stream.get("index_set_id")).filter(s -> !Strings.isNullOrEmpty((String)s)).collect(Collectors.toSet());
        return this.indexSetService.findByIds(indexSetIds).stream().collect(Collectors.toMap(IndexSetConfig::id, this.indexSetFactory::create));
    }

    @Override
    public Set<Stream> loadByIds(Collection<String> streamIds) {
        Set objectIds = streamIds.stream().map(ObjectId::new).collect(Collectors.toSet());
        DBObject query = QueryBuilder.start((String)"_id").in(objectIds).get();
        return ImmutableSet.copyOf(this.loadAll(query));
    }

    @Override
    public Set<String> indexSetIdsByIds(Collection<String> streamIds) {
        Set objectIds = streamIds.stream().map(ObjectId::new).collect(Collectors.toSet());
        DBObject query = QueryBuilder.start((String)"_id").in(objectIds).get();
        DBProjection.ProjectionBuilder onlyIndexSetIdField = DBProjection.include((String[])new String[]{"index_set_id"});
        return StreamSupport.stream(this.collection(StreamImpl.class).find(query, (DBObject)onlyIndexSetIdField).spliterator(), false).map(s -> s.get("index_set_id").toString()).collect(Collectors.toSet());
    }

    protected Set<Output> loadOutputsForRawStream(DBObject stream) {
        List<ObjectId> outputIds = this.outputIdsForRawStream(stream);
        HashSet<Output> result = new HashSet<Output>();
        if (outputIds != null) {
            for (ObjectId outputId : outputIds) {
                try {
                    result.add(this.outputService.load(outputId.toHexString()));
                }
                catch (NotFoundException e) {
                    LOG.warn("Non-existing output <{}> referenced from stream <{}>!", (Object)outputId.toHexString(), stream.get("_id"));
                }
            }
        }
        return result;
    }

    @Override
    public long count() {
        return this.totalCount(StreamImpl.class);
    }

    @Override
    public void destroy(Stream stream) throws NotFoundException {
        for (StreamRule streamRule : this.streamRuleService.loadForStream(stream)) {
            super.destroy(streamRule);
        }
        String streamId = stream.getId();
        for (Notification notification : this.notificationService.all()) {
            Object rawValue = notification.getDetail("stream_id");
            if (rawValue == null || !rawValue.toString().equals(streamId)) continue;
            LOG.debug("Removing notification that references stream: {}", (Object)notification);
            this.notificationService.destroy(notification);
        }
        super.destroy(stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(streamId));
        this.clusterEventBus.post(StreamDeletedEvent.create(streamId));
        this.entityOwnershipService.unregisterStream(streamId);
    }

    public void update(Stream stream, String title, String description) throws ValidationException {
        if (title != null) {
            stream.getFields().put("title", title);
        }
        if (description != null) {
            stream.getFields().put("description", description);
        }
        this.save(stream);
    }

    @Override
    public void pause(Stream stream) throws ValidationException {
        stream.setDisabled(true);
        String streamId = this.save(stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(streamId));
    }

    @Override
    public void resume(Stream stream) throws ValidationException {
        stream.setDisabled(false);
        String streamId = this.save(stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(streamId));
    }

    @Override
    public void addOutput(Stream stream, Output output) {
        this.collection(stream).update((DBObject)new BasicDBObject("_id", (Object)new ObjectId(stream.getId())), (DBObject)new BasicDBObject("$addToSet", (Object)new BasicDBObject("outputs", (Object)new ObjectId(output.getId()))));
        this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
    }

    @Override
    public void addOutputs(ObjectId streamId, Collection<ObjectId> outputIds) {
        BasicDBList outputs = new BasicDBList();
        outputs.addAll(outputIds);
        this.collection(StreamImpl.class).update((DBObject)new BasicDBObject("_id", (Object)streamId), (DBObject)new BasicDBObject("$addToSet", (Object)new BasicDBObject("outputs", (Object)new BasicDBObject("$each", (Object)outputs))));
        this.clusterEventBus.post(StreamsChangedEvent.create(streamId.toHexString()));
    }

    @Override
    public void removeOutput(Stream stream, Output output) {
        this.collection(stream).update((DBObject)new BasicDBObject("_id", (Object)new ObjectId(stream.getId())), (DBObject)new BasicDBObject("$pull", (Object)new BasicDBObject("outputs", (Object)new ObjectId(output.getId()))));
        this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
    }

    @Override
    public void removeOutputFromAllStreams(Output output) {
        ImmutableSet updatedStreams;
        ObjectId outputId = new ObjectId(output.getId());
        BasicDBObject match = new BasicDBObject("outputs", (Object)outputId);
        BasicDBObject modify = new BasicDBObject("$pull", (Object)new BasicDBObject("outputs", (Object)outputId));
        try (DBCursor cursor = this.collection(StreamImpl.class).find((DBObject)match);){
            updatedStreams = (ImmutableSet)StreamSupport.stream(cursor.spliterator(), false).map(stream -> stream.get("_id")).filter(Objects::nonNull).map(id -> ((ObjectId)id).toHexString()).collect(ImmutableSet.toImmutableSet());
        }
        this.collection(StreamImpl.class).update((DBObject)match, (DBObject)modify, false, true);
        this.clusterEventBus.post(StreamsChangedEvent.create((ImmutableSet<String>)updatedStreams));
    }

    @Override
    public List<Stream> loadAllWithIndexSet(String indexSetId) {
        BasicDBObject query = new BasicDBObject("index_set_id", (Object)indexSetId);
        return this.loadAll((Map<String, Object>)query);
    }

    @Override
    public String save(Stream stream) throws ValidationException {
        String savedStreamId = super.save(stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(savedStreamId));
        return savedStreamId;
    }

    @Override
    public String saveWithRulesAndOwnership(Stream stream, Collection<StreamRule> streamRules, User user) throws ValidationException {
        String savedStreamId = super.save(stream);
        Set<StreamRule> rules = streamRules.stream().map(rule -> this.streamRuleService.copy(savedStreamId, (StreamRule)rule)).collect(Collectors.toSet());
        this.streamRuleService.save(rules);
        this.entityOwnershipService.registerNewStream(savedStreamId, user);
        this.clusterEventBus.post(StreamsChangedEvent.create(savedStreamId));
        return savedStreamId;
    }
}

