/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.rest.resources.streams;

import com.codahale.metrics.annotation.Timed;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.bson.types.ObjectId;
import org.graylog.grn.GRNTypes;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.rest.PipelineCompactSource;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog.plugins.views.startpage.recentActivities.RecentActivityService;
import org.graylog.security.UserContext;
import org.graylog2.audit.AuditEventSender;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.audit.jersey.DefaultFailureContextCreator;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.audit.jersey.SuccessContextCreator;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.PaginatedList;
import org.graylog2.database.filtering.DbQueryCreator;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.database.Persisted;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.bulk.AuditParams;
import org.graylog2.rest.bulk.BulkExecutor;
import org.graylog2.rest.bulk.SequentialBulkExecutor;
import org.graylog2.rest.bulk.model.BulkOperationRequest;
import org.graylog2.rest.bulk.model.BulkOperationResponse;
import org.graylog2.rest.models.streams.requests.UpdateStreamRequest;
import org.graylog2.rest.models.system.outputs.responses.OutputSummary;
import org.graylog2.rest.models.tools.responses.PageListResponse;
import org.graylog2.rest.resources.entities.EntityAttribute;
import org.graylog2.rest.resources.entities.EntityDefaults;
import org.graylog2.rest.resources.entities.FilterOption;
import org.graylog2.rest.resources.entities.Sorting;
import org.graylog2.rest.resources.streams.requests.CloneStreamRequest;
import org.graylog2.rest.resources.streams.requests.CreateStreamRequest;
import org.graylog2.rest.resources.streams.responses.StreamCreatedResponse;
import org.graylog2.rest.resources.streams.responses.StreamListResponse;
import org.graylog2.rest.resources.streams.responses.StreamResponse;
import org.graylog2.rest.resources.streams.responses.TestMatchResponse;
import org.graylog2.rest.resources.streams.rules.requests.CreateStreamRuleRequest;
import org.graylog2.search.SearchQueryField;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.streams.PaginatedStreamService;
import org.graylog2.streams.StreamDTO;
import org.graylog2.streams.StreamGuardException;
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;

@RequiresAuthentication
@Api(value="Streams", description="Manage streams", tags={"cloud"})
@Path(value="/streams")
public class StreamResource
extends RestResource {
    private static final String DEFAULT_SORT_FIELD = "title";
    private static final String DEFAULT_SORT_DIRECTION = "asc";
    private static final List<EntityAttribute> attributes = List.of(EntityAttribute.builder().id("_id").title("id").type(SearchQueryField.Type.OBJECT_ID).hidden(true).searchable(true).build(), EntityAttribute.builder().id("title").title("Title").searchable(true).build(), EntityAttribute.builder().id("description").title("Description").searchable(true).build(), EntityAttribute.builder().id("created_at").title("Created").type(SearchQueryField.Type.DATE).filterable(true).build(), EntityAttribute.builder().id("index_set_id").title("Index set").relatedCollection("index_sets").hidden(true).filterable(true).build(), EntityAttribute.builder().id("disabled").title("Status").type(SearchQueryField.Type.BOOLEAN).filterable(true).filterOptions(Set.of(FilterOption.create("true", "Paused"), FilterOption.create("false", "Running"))).build());
    private static final EntityDefaults settings = EntityDefaults.builder().sort(Sorting.create("title", Sorting.Direction.valueOf("asc".toUpperCase(Locale.ROOT)))).build();
    private final PaginatedStreamService paginatedStreamService;
    private final MessageFactory messageFactory;
    private final StreamService streamService;
    private final StreamRuleService streamRuleService;
    private final StreamRouterEngine.Factory streamRouterEngineFactory;
    private final IndexSetRegistry indexSetRegistry;
    private final RecentActivityService recentActivityService;
    private final BulkExecutor<Stream, UserContext> bulkStreamDeleteExecutor;
    private final BulkExecutor<Stream, UserContext> bulkStreamStartExecutor;
    private final BulkExecutor<Stream, UserContext> bulkStreamStopExecutor;
    private final PipelineStreamConnectionsService pipelineStreamConnectionsService;
    private final PipelineService pipelineService;
    private final DbQueryCreator dbQueryCreator;

    @Inject
    public StreamResource(StreamService streamService, PaginatedStreamService paginatedStreamService, StreamRuleService streamRuleService, StreamRouterEngine.Factory streamRouterEngineFactory, IndexSetRegistry indexSetRegistry, RecentActivityService recentActivityService, AuditEventSender auditEventSender, MessageFactory messageFactory, PipelineStreamConnectionsService pipelineStreamConnectionsService, PipelineService pipelineService) {
        this.streamService = streamService;
        this.streamRuleService = streamRuleService;
        this.streamRouterEngineFactory = streamRouterEngineFactory;
        this.indexSetRegistry = indexSetRegistry;
        this.paginatedStreamService = paginatedStreamService;
        this.messageFactory = messageFactory;
        this.pipelineStreamConnectionsService = pipelineStreamConnectionsService;
        this.pipelineService = pipelineService;
        this.dbQueryCreator = new DbQueryCreator(DEFAULT_SORT_FIELD, attributes);
        this.recentActivityService = recentActivityService;
        SuccessContextCreator<Stream> successAuditLogContextCreator = (entity, entityClass) -> Map.of("response_entity", Map.of("stream_id", entity.getId(), DEFAULT_SORT_FIELD, entity.getTitle()));
        DefaultFailureContextCreator failureAuditLogContextCreator = new DefaultFailureContextCreator();
        this.bulkStreamDeleteExecutor = new SequentialBulkExecutor<Stream, UserContext>(this::deleteInner, auditEventSender, successAuditLogContextCreator, failureAuditLogContextCreator);
        this.bulkStreamStartExecutor = new SequentialBulkExecutor<Stream, UserContext>(this::resumeInner, auditEventSender, successAuditLogContextCreator, failureAuditLogContextCreator);
        this.bulkStreamStopExecutor = new SequentialBulkExecutor<Stream, UserContext>(this::pauseInner, auditEventSender, successAuditLogContextCreator, failureAuditLogContextCreator);
    }

    @POST
    @Timed
    @ApiOperation(value="Create a stream", response=StreamCreatedResponse.class)
    @RequiresPermissions(value={"streams:create"})
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    @AuditEvent(type="server:stream:create")
    public Response create(@ApiParam(name="JSON body", required=true) CreateStreamRequest cr, @Context UserContext userContext) throws ValidationException {
        Stream stream = this.streamService.create(cr, this.getCurrentUser().getName());
        stream.setDisabled(true);
        IndexSet indexSet = stream.getIndexSet();
        this.checkIndexSet(indexSet);
        Set<StreamRule> streamRules = cr.rules().stream().map(streamRule -> this.streamRuleService.create(null, (CreateStreamRuleRequest)streamRule)).collect(Collectors.toSet());
        String id = this.streamService.saveWithRulesAndOwnership(stream, streamRules, userContext.getUser());
        StreamCreatedResponse result = new StreamCreatedResponse(id);
        URI streamUri = this.getUriBuilderToSelf().path(StreamResource.class).path("{streamId}").build(new Object[]{id});
        this.recentActivityService.create(id, GRNTypes.STREAM, userContext.getUser());
        return Response.created((URI)streamUri).entity((Object)result).build();
    }

    @GET
    @Timed
    @Path(value="/paginated")
    @ApiOperation(value="Get a paginated list of streams")
    @Produces(value={"application/json"})
    public PageListResponse<StreamDTO> getPage(@ApiParam(name="page") @QueryParam(value="page") @DefaultValue(value="1") int page, @ApiParam(name="per_page") @QueryParam(value="per_page") @DefaultValue(value="50") int perPage, @ApiParam(name="query") @QueryParam(value="query") @DefaultValue(value="") String query, @ApiParam(name="filters") @QueryParam(value="filters") List<String> filters, @ApiParam(name="sort", value="The field to sort the result on", required=true, allowableValues="title,description,created_at,updated_at,status") @DefaultValue(value="title") @QueryParam(value="sort") String sort, @ApiParam(name="order", value="The sort direction", allowableValues="asc, desc") @DefaultValue(value="asc") @QueryParam(value="order") String order) {
        Predicate<StreamDTO> permissionFilter = streamDTO -> this.isPermitted("streams:read", streamDTO.id());
        PaginatedList<StreamDTO> result = this.paginatedStreamService.findPaginated(this.dbQueryCreator.createDbQuery(filters, query), permissionFilter, page, perPage, sort, order);
        List<String> streamIds = result.stream().map(StreamDTO::id).toList();
        Map<String, List<StreamRule>> streamRuleMap = this.streamRuleService.loadForStreamIds(streamIds);
        List<StreamDTO> streams = result.stream().map(streamDTO -> {
            List<StreamRule> rules = streamRuleMap.getOrDefault(streamDTO.id(), Collections.emptyList());
            return streamDTO.toBuilder().rules(rules).build();
        }).toList();
        long total = this.paginatedStreamService.count();
        PaginatedList<StreamDTO> streamDTOS = new PaginatedList<StreamDTO>(streams, result.pagination().total(), result.pagination().page(), result.pagination().perPage());
        return PageListResponse.create(query, streamDTOS.pagination(), total, sort, order, streams, attributes, settings);
    }

    @GET
    @Timed
    @ApiOperation(value="Get a list of all streams")
    @Deprecated
    @Produces(value={"application/json"})
    public StreamListResponse get() {
        List<Stream> streams = this.streamService.loadAll().stream().filter(stream -> this.isPermitted("streams:read", stream.getId())).toList();
        return StreamListResponse.create(streams.size(), streams.stream().map(this::streamToResponse).collect(Collectors.toSet()));
    }

    @GET
    @Path(value="/enabled")
    @Timed
    @ApiOperation(value="Get a list of all enabled streams")
    @Produces(value={"application/json"})
    public StreamListResponse getEnabled() {
        List<Stream> streams = this.streamService.loadAllEnabled().stream().filter(stream -> this.isPermitted("streams:read", stream.getId())).toList();
        return StreamListResponse.create(streams.size(), streams.stream().map(this::streamToResponse).collect(Collectors.toSet()));
    }

    @GET
    @Path(value="/{streamId}")
    @Timed
    @ApiOperation(value="Get a single stream")
    @Produces(value={"application/json"})
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid ObjectId.")})
    public StreamResponse get(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") @NotEmpty String streamId) throws NotFoundException {
        this.checkPermission("streams:read", streamId);
        return this.streamToResponse(this.streamService.load(streamId));
    }

    @PUT
    @Timed
    @Path(value="/{streamId}")
    @ApiOperation(value="Update a stream")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid ObjectId.")})
    @AuditEvent(type="server:stream:update")
    public StreamResponse update(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") String streamId, @ApiParam(name="JSON body", required=true) @Valid @NotNull UpdateStreamRequest cr, @Context UserContext userContext) throws NotFoundException, ValidationException {
        IndexSet indexSet;
        Boolean removeMatchesFromDefaultStream;
        this.checkPermission("streams:edit", streamId);
        this.checkNotEditableStream(streamId, "The stream cannot be edited.");
        Stream stream = this.streamService.load(streamId);
        if (!Strings.isNullOrEmpty((String)cr.title())) {
            stream.setTitle(cr.title().strip());
        }
        stream.setDescription(cr.description());
        if (cr.matchingType() != null) {
            try {
                stream.setMatchingType(Stream.MatchingType.valueOf(cr.matchingType()));
            }
            catch (IllegalArgumentException e) {
                throw new BadRequestException("Invalid matching type '" + cr.matchingType() + "' specified. Should be one of: " + Arrays.toString((Object[])Stream.MatchingType.values()));
            }
        }
        if ((removeMatchesFromDefaultStream = cr.removeMatchesFromDefaultStream()) != null) {
            stream.setRemoveMatchesFromDefaultStream(removeMatchesFromDefaultStream);
        }
        if (!Strings.isNullOrEmpty((String)cr.indexSetId())) {
            stream.setIndexSetId(cr.indexSetId());
        }
        if (!(indexSet = this.indexSetRegistry.get(stream.getIndexSetId()).orElseThrow(() -> new BadRequestException("Index set with ID <" + stream.getIndexSetId() + "> does not exist!"))).getConfig().isWritable()) {
            throw new BadRequestException("Assigned index set must be writable!");
        }
        if (!indexSet.getConfig().isRegularIndex()) {
            throw new BadRequestException("Assigned index set is not usable");
        }
        this.streamService.save(stream);
        this.recentActivityService.update(streamId, GRNTypes.STREAM, userContext.getUser());
        return this.streamToResponse(stream);
    }

    @DELETE
    @Path(value="/{streamId}")
    @Timed
    @ApiOperation(value="Delete a stream")
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid ObjectId.")})
    @AuditEvent(type="server:stream:delete")
    public void delete(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") String streamId, @Context UserContext userContext) throws NotFoundException {
        this.deleteInner(streamId, userContext);
    }

    private Stream deleteInner(String streamId, UserContext userContext) throws NotFoundException {
        this.checkPermission("streams:edit", streamId);
        this.checkNotEditableStream(streamId, "The stream cannot be deleted.");
        Stream stream = this.streamService.load(streamId);
        try {
            this.streamService.destroy(stream);
        }
        catch (StreamGuardException e) {
            throw new BadRequestException(e.getMessage());
        }
        this.recentActivityService.delete(streamId, GRNTypes.STREAM, stream.getTitle(), userContext.getUser());
        return stream;
    }

    @POST
    @Path(value="/bulk_delete")
    @Consumes(value={"application/json"})
    @Timed
    @ApiOperation(value="Delete a bulk of streams", response=BulkOperationResponse.class)
    @NoAuditEvent(value="Audit events triggered manually")
    public Response bulkDelete(@ApiParam(name="Entities to remove", required=true) BulkOperationRequest bulkOperationRequest, @Context UserContext userContext) {
        BulkOperationResponse response = this.bulkStreamDeleteExecutor.executeBulkOperation(bulkOperationRequest, userContext, new AuditParams("server:stream:delete", "streamId", Stream.class));
        return Response.status((Response.Status)Response.Status.OK).entity((Object)response).build();
    }

    @POST
    @Path(value="/bulk_pause")
    @Consumes(value={"application/json"})
    @Timed
    @ApiOperation(value="Pause a bulk of streams", response=BulkOperationResponse.class)
    @NoAuditEvent(value="Audit events triggered manually")
    public Response bulkPause(@ApiParam(name="Streams to pause", required=true) BulkOperationRequest bulkOperationRequest, @Context UserContext userContext) {
        BulkOperationResponse response = this.bulkStreamStopExecutor.executeBulkOperation(bulkOperationRequest, userContext, new AuditParams("server:stream:stop", "streamId", Stream.class));
        return Response.status((Response.Status)Response.Status.OK).entity((Object)response).build();
    }

    @POST
    @Path(value="/bulk_resume")
    @Consumes(value={"application/json"})
    @Timed
    @ApiOperation(value="Resume a bulk of streams", response=BulkOperationResponse.class)
    @NoAuditEvent(value="Audit events triggered manually")
    public Response bulkResume(@ApiParam(name="Streams to resume", required=true) BulkOperationRequest bulkOperationRequest, @Context UserContext userContext) {
        BulkOperationResponse response = this.bulkStreamStartExecutor.executeBulkOperation(bulkOperationRequest, userContext, new AuditParams("server:stream:start", "streamId", Stream.class));
        return Response.status((Response.Status)Response.Status.OK).entity((Object)response).build();
    }

    @POST
    @Path(value="/{streamId}/pause")
    @Timed
    @ApiOperation(value="Pause a stream")
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid or missing Stream id.")})
    @AuditEvent(type="server:stream:stop")
    public void pause(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") @NotEmpty String streamId) throws NotFoundException, ValidationException {
        this.pauseInner(streamId, null);
    }

    private Stream pauseInner(String streamId, UserContext userContext) throws NotFoundException, ValidationException {
        this.checkAnyPermission(new String[]{"streams:changestate", "streams:edit"}, streamId);
        this.checkNotEditableStream(streamId, "The stream cannot be paused.");
        Stream stream = this.streamService.load(streamId);
        this.streamService.pause(stream);
        return stream;
    }

    @POST
    @Path(value="/{streamId}/resume")
    @Timed
    @ApiOperation(value="Resume a stream")
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid or missing Stream id.")})
    @AuditEvent(type="server:stream:start")
    public void resume(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") @NotEmpty String streamId, @Context UserContext userContext) throws NotFoundException, ValidationException {
        this.resumeInner(streamId, null);
    }

    private Stream resumeInner(String streamId, UserContext userContext) throws NotFoundException, ValidationException {
        this.checkAnyPermission(new String[]{"streams:changestate", "streams:edit"}, streamId);
        this.checkNotEditableStream(streamId, "The stream cannot be resumed.");
        Stream stream = this.streamService.load(streamId);
        this.streamService.resume(stream);
        return stream;
    }

    @POST
    @Path(value="/{streamId}/testMatch")
    @Timed
    @ApiOperation(value="Test matching of a stream against a supplied message")
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid or missing Stream id.")})
    @NoAuditEvent(value="only used for testing stream matches")
    public TestMatchResponse testMatch(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") String streamId, @ApiParam(name="JSON body", required=true) @NotNull Map<String, Map<String, Object>> serialisedMessage) throws NotFoundException {
        this.checkPermission("streams:read", streamId);
        Stream stream = this.streamService.load(streamId);
        HashMap<String, Object> m = new HashMap<String, Object>(serialisedMessage.get("message"));
        String timeStamp = (String)MoreObjects.firstNonNull((Object)((String)m.get("timestamp")), (Object)DateTime.now((DateTimeZone)DateTimeZone.UTC).toString(ISODateTimeFormat.dateTime()));
        m.put("timestamp", Tools.dateTimeFromString(timeStamp));
        Message message = this.messageFactory.createMessage(m);
        ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("stream-" + streamId + "-test-match-%d").build());
        StreamRouterEngine streamRouterEngine = this.streamRouterEngineFactory.create(Lists.newArrayList((Object[])new Stream[]{stream}), executor);
        List<StreamRouterEngine.StreamTestMatch> streamTestMatches = streamRouterEngine.testMatch(message);
        StreamRouterEngine.StreamTestMatch streamTestMatch = streamTestMatches.get(0);
        HashMap rules = Maps.newHashMap();
        for (Map.Entry<StreamRule, Boolean> match : streamTestMatch.getMatches().entrySet()) {
            rules.put(match.getKey().getId(), match.getValue());
        }
        return TestMatchResponse.create(streamTestMatch.isMatched(), rules);
    }

    @POST
    @Path(value="/{streamId}/clone")
    @Timed
    @ApiOperation(value="Clone a stream", response=StreamCreatedResponse.class)
    @ApiResponses(value={@ApiResponse(code=404, message="Stream not found."), @ApiResponse(code=400, message="Invalid or missing Stream id.")})
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    @AuditEvent(type="server:stream:create")
    public Response cloneStream(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") String streamId, @ApiParam(name="JSON body", required=true) @Valid @NotNull CloneStreamRequest cr, @Context UserContext userContext) throws ValidationException, NotFoundException {
        this.checkPermission("streams:create");
        this.checkPermission("streams:read", streamId);
        this.checkNotEditableStream(streamId, "The stream cannot be cloned.");
        Stream sourceStream = this.streamService.load(streamId);
        String creatorUser = this.getCurrentUser().getName();
        List<StreamRule> sourceStreamRules = this.streamRuleService.loadForStream(sourceStream);
        Set<StreamRule> newStreamRules = sourceStreamRules.stream().map(streamRule -> this.streamRuleService.copy(null, (StreamRule)streamRule)).collect(Collectors.toSet());
        Map<String, Object> streamData = Map.of(DEFAULT_SORT_FIELD, cr.title().strip(), "description", cr.description(), "creator_user_id", creatorUser, "created_at", Tools.nowUTC(), "matching_type", sourceStream.getMatchingType().toString(), "remove_matches_from_default_stream", cr.removeMatchesFromDefaultStream(), "disabled", true, "index_set_id", cr.indexSetId());
        Stream stream = this.streamService.create(streamData);
        String savedStreamId = this.streamService.saveWithRulesAndOwnership(stream, newStreamRules, userContext.getUser());
        ObjectId savedStreamObjectId = new ObjectId(savedStreamId);
        Set<ObjectId> outputIds = sourceStream.getOutputs().stream().map(Output::getId).map(ObjectId::new).collect(Collectors.toSet());
        this.streamService.addOutputs(savedStreamObjectId, outputIds);
        StreamCreatedResponse result = new StreamCreatedResponse(savedStreamId);
        URI streamUri = this.getUriBuilderToSelf().path(StreamResource.class).path("{streamId}").build(new Object[]{savedStreamId});
        return Response.created((URI)streamUri).entity((Object)result).build();
    }

    @GET
    @Path(value="/{streamId}/pipelines")
    @ApiOperation(value="Get pipelines associated with a stream")
    @Produces(value={"application/json"})
    public List<PipelineCompactSource> getConnectedPipelines(@ApiParam(name="streamId", required=true) @PathParam(value="streamId") String streamId) throws NotFoundException {
        PipelineConnections pipelineConnections = this.pipelineStreamConnectionsService.load(streamId);
        ArrayList<PipelineCompactSource> list = new ArrayList<PipelineCompactSource>();
        for (String id : pipelineConnections.pipelineIds()) {
            PipelineDao pipelineDao = this.pipelineService.load(id);
            list.add(PipelineCompactSource.create(pipelineDao.id(), pipelineDao.title()));
        }
        return list;
    }

    @PUT
    @Path(value="/indexSet/{indexSetId}")
    @Timed
    @ApiOperation(value="Assign multiple streams to index set")
    @ApiResponses(value={@ApiResponse(code=404, message="Index set not found.")})
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    @AuditEvent(type="server:stream:update")
    public Response assignToIndexSet(@ApiParam(name="indexSetId", required=true) @PathParam(value="indexSetId") String indexSetId, @ApiParam(name="JSON body", required=true) @Valid @NotNull List<String> streamIds) {
        this.checkPermission("indexsets:read", indexSetId);
        streamIds.forEach(streamId -> {
            this.checkPermission("streams:edit", (String)streamId);
            this.checkNotEditableStream((String)streamId, "The stream with id <" + streamId + "> cannot be edited.");
        });
        Set existingStreams = this.streamService.loadByIds(streamIds).stream().map(Persisted::getId).collect(Collectors.toSet());
        Sets.SetView missingStreams = Sets.difference(new HashSet<String>(streamIds), existingStreams);
        if (!missingStreams.isEmpty()) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)("Missing streams: " + (Set)missingStreams)).build();
        }
        return this.indexSetRegistry.get(indexSetId).map(indexSet -> {
            this.checkIndexSet((IndexSet)indexSet);
            this.streamService.addToIndexSet(indexSetId, streamIds);
            return Response.ok().build();
        }).orElse(Response.status((Response.Status)Response.Status.NOT_FOUND).build());
    }

    private void checkIndexSet(IndexSet indexSet) {
        if (!indexSet.getConfig().isWritable()) {
            throw new BadRequestException("Assigned index set must be writable!");
        }
        if (!indexSet.getConfig().isRegularIndex()) {
            throw new BadRequestException("Assigned index set is not usable");
        }
    }

    private StreamResponse streamToResponse(Stream stream) {
        return StreamResponse.create(stream.getId(), (String)stream.getFields().get("creator_user_id"), this.outputsToSummaries(stream.getOutputs()), stream.getMatchingType().name(), stream.getDescription(), stream.getFields().get("created_at").toString(), stream.getDisabled(), stream.getStreamRules(), stream.getTitle(), stream.getContentPack(), stream.isDefaultStream(), stream.getRemoveMatchesFromDefaultStream(), stream.getIndexSetId(), stream.getCategories());
    }

    private Collection<OutputSummary> outputsToSummaries(Collection<Output> outputs) {
        return outputs.stream().map(output -> OutputSummary.create(output.getId(), output.getTitle(), output.getType(), output.getCreatorUserId(), new DateTime((Object)output.getCreatedAt()), output.getConfiguration(), output.getContentPack())).collect(Collectors.toSet());
    }

    private void checkNotEditableStream(String streamId, String message) {
        if ("000000000000000000000001".equals(streamId) || !Stream.streamIsEditable(streamId)) {
            throw new BadRequestException(message);
        }
    }
}

