/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.ingest.dataworker;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.mware.core.bootstrap.InjectHelper;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.dataworker.DataWorker;
import com.mware.core.ingest.dataworker.DataWorkerData;
import com.mware.core.ingest.dataworker.DataWorkerInitializer;
import com.mware.core.ingest.dataworker.DataWorkerItem;
import com.mware.core.ingest.dataworker.DataWorkerMessage;
import com.mware.core.ingest.dataworker.DataWorkerPrepareData;
import com.mware.core.ingest.dataworker.DataWorkerThreadedWrapper;
import com.mware.core.ingest.dataworker.ElementOrPropertyStatus;
import com.mware.core.ingest.dataworker.TermMentionFilter;
import com.mware.core.ingest.dataworker.TermMentionFilterPrepareData;
import com.mware.core.ingest.dataworker.VerifyResults;
import com.mware.core.model.WorkerBase;
import com.mware.core.model.properties.BcSchema;
import com.mware.core.model.role.AuthorizationRepository;
import com.mware.core.model.user.UserRepository;
import com.mware.core.model.workQueue.Priority;
import com.mware.core.model.workQueue.WebQueueRepository;
import com.mware.core.model.workQueue.WorkQueueRepository;
import com.mware.core.security.VisibilityTranslator;
import com.mware.core.status.MetricsManager;
import com.mware.core.status.StatusRepository;
import com.mware.core.status.StatusServer;
import com.mware.core.status.model.DataWorkerRunnerStatus;
import com.mware.core.status.model.ProcessStatus;
import com.mware.core.user.User;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.mware.core.util.ServiceLoaderUtil;
import com.mware.core.util.StoppableRunnable;
import com.mware.core.util.TeeInputStream;
import com.mware.ge.Authorizations;
import com.mware.ge.Edge;
import com.mware.ge.Element;
import com.mware.ge.FetchHints;
import com.mware.ge.Graph;
import com.mware.ge.Property;
import com.mware.ge.Vertex;
import com.mware.ge.util.IterableUtils;
import com.mware.ge.values.storable.StreamingPropertyValue;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

public class DataWorkerRunner
extends WorkerBase<DataWorkerItem> {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(DataWorkerRunner.class);
    private final StatusRepository statusRepository;
    private final AuthorizationRepository authorizationRepository;
    private Graph graph;
    private Authorizations authorizations;
    private List<DataWorkerThreadedWrapper> workerWrappers = Lists.newArrayList();
    private User user;
    private UserRepository userRepository;
    private Configuration configuration;
    private VisibilityTranslator visibilityTranslator;
    private AtomicLong lastProcessedPropertyTime = new AtomicLong(0L);
    private List<DataWorker> dataWorkers = Lists.newArrayList();
    private boolean prepareWorkersCalled;
    private final String queueName;

    @Inject
    public DataWorkerRunner(WorkQueueRepository workQueueRepository, WebQueueRepository webQueueRepository, StatusRepository statusRepository, Configuration configuration, MetricsManager metricsManager, AuthorizationRepository authorizationRepository) {
        super(workQueueRepository, webQueueRepository, configuration, metricsManager);
        this.statusRepository = statusRepository;
        this.authorizationRepository = authorizationRepository;
        this.queueName = configuration.get("dw.queue.name", "dwQueue");
    }

    @Override
    protected DataWorkerItem tupleDataToWorkerItem(byte[] data) {
        DataWorkerMessage message = DataWorkerMessage.create(data);
        return new DataWorkerItem(message, this.getElements(message));
    }

    @Override
    protected String getQueueName() {
        return this.queueName;
    }

    @Override
    public void process(DataWorkerItem workerItem) throws Exception {
        DataWorkerMessage message = workerItem.getMessage();
        if (message.getProperties() != null && message.getProperties().length > 0) {
            this.safeExecuteHandlePropertiesOnElements(workerItem);
        } else if (message.getPropertyName() != null) {
            this.safeExecuteHandlePropertyOnElements(workerItem);
        } else {
            this.safeExecuteHandleAllEntireElements(workerItem);
        }
    }

    public void prepare(User user) {
        this.prepare(user, new DataWorkerInitializer());
    }

    public void prepare(User user, DataWorkerInitializer repository) {
        this.setUser(user);
        this.setAuthorizations(this.authorizationRepository.getGraphAuthorizations(user, new String[0]));
        this.prepareWorkers(repository);
        this.getWorkQueueRepository().setDataWorkerRunner(this);
    }

    public void prepareWorkers(DataWorkerInitializer initializer) {
        if (this.prepareWorkersCalled) {
            throw new BcException("prepareWorkers should be called only once");
        }
        this.prepareWorkersCalled = true;
        List<TermMentionFilter> termMentionFilters = this.loadTermMentionFilters();
        DataWorkerPrepareData workerPrepareData = new DataWorkerPrepareData(this.configuration.toMap(), termMentionFilters, this.user, this.authorizations, InjectHelper.getInjector());
        Collection<DataWorker> workers = this.getAvailableWorkers();
        for (DataWorker worker : workers) {
            try {
                LOGGER.debug("verifying: %s", worker.getClass().getName());
                VerifyResults verifyResults = worker.verify();
                if (verifyResults != null && verifyResults.getFailures().size() > 0) {
                    LOGGER.error("data worker %s had errors verifying", worker.getClass().getName());
                    for (VerifyResults.Failure failure : verifyResults.getFailures()) {
                        LOGGER.error("  %s", failure.getMessage());
                    }
                }
                if (initializer == null) continue;
                initializer.initialize(worker);
            }
            catch (Exception ex) {
                LOGGER.error("Could not verify data worker %s", worker.getClass().getName(), ex);
            }
        }
        ArrayList wrappers = Lists.newArrayList();
        for (DataWorker worker : workers) {
            try {
                LOGGER.debug("preparing: %s", worker.getClass().getName());
                worker.prepare(workerPrepareData);
                this.dataWorkers.add(worker);
            }
            catch (Exception ex) {
                LOGGER.error("Could not prepare data worker %s", worker.getClass().getName(), ex);
            }
            DataWorkerThreadedWrapper wrapper = new DataWorkerThreadedWrapper(worker);
            this.setupWrapper(wrapper);
            wrappers.add(wrapper);
            Thread thread = new Thread(wrapper);
            String workerName = worker.getClass().getName();
            thread.setName("dataWorker-" + workerName);
            thread.start();
        }
        this.addDataWorkerThreadedWrappers(wrappers);
    }

    protected DataWorkerThreadedWrapper setupWrapper(DataWorkerThreadedWrapper wrapper) {
        return InjectHelper.inject(wrapper);
    }

    protected Collection<DataWorker> getAvailableWorkers() {
        return InjectHelper.getInjectedServices(DataWorker.class, this.configuration);
    }

    public void addDataWorkerThreadedWrappers(List<DataWorkerThreadedWrapper> wrappers) {
        this.workerWrappers.addAll(wrappers);
    }

    public void addDataWorkerThreadedWrappers(DataWorkerThreadedWrapper ... wrappers) {
        this.workerWrappers.addAll(Lists.newArrayList((Object[])wrappers));
    }

    private List<TermMentionFilter> loadTermMentionFilters() {
        TermMentionFilterPrepareData termMentionFilterPrepareData = new TermMentionFilterPrepareData(this.configuration.toMap(), this.user, this.authorizations, InjectHelper.getInjector());
        List<TermMentionFilter> termMentionFilters = IterableUtils.toList(ServiceLoaderUtil.load(TermMentionFilter.class, this.configuration));
        for (TermMentionFilter termMentionFilter : termMentionFilters) {
            try {
                termMentionFilter.prepare(termMentionFilterPrepareData);
            }
            catch (Exception ex) {
                throw new BcException("Could not initialize term mention filter: " + termMentionFilter.getClass().getName(), ex);
            }
        }
        return termMentionFilters;
    }

    @Override
    protected StatusServer createStatusServer() throws Exception {
        return new StatusServer(this.configuration, this.statusRepository, "dataWorker", DataWorkerRunner.class){

            @Override
            protected ProcessStatus createStatus() {
                DataWorkerRunnerStatus status = new DataWorkerRunnerStatus();
                for (DataWorkerThreadedWrapper dataWorkerThreadedWrapper : DataWorkerRunner.this.workerWrappers) {
                    status.getRunningWorkers().add(dataWorkerThreadedWrapper.getStatus());
                }
                return status;
            }
        };
    }

    private void safeExecuteHandleAllEntireElements(DataWorkerItem workerItem) throws Exception {
        for (Element element : workerItem.getElements()) {
            this.safeExecuteHandleEntireElement(element, workerItem.getMessage());
        }
    }

    private void safeExecuteHandleEntireElement(Element element, DataWorkerMessage message) throws Exception {
        this.safeExecuteHandlePropertyOnElement(element, null, message);
        for (Property property : element.getProperties()) {
            this.safeExecuteHandlePropertyOnElement(element, property, message);
        }
    }

    private ImmutableList<Element> getVerticesFromMessage(DataWorkerMessage message) {
        ImmutableList.Builder vertices = ImmutableList.builder();
        for (String vertexId : message.getGraphVertexId()) {
            Vertex vertex = message.getStatus() == ElementOrPropertyStatus.DELETION || message.getStatus() == ElementOrPropertyStatus.HIDDEN ? this.graph.getVertex(vertexId, FetchHints.ALL, message.getBeforeActionTimestamp(), this.authorizations) : this.graph.getVertex(vertexId, FetchHints.ALL, this.authorizations);
            if (this.doesExist(vertex)) {
                vertices.add((Object)vertex);
                continue;
            }
            LOGGER.warn("Could not find vertex with id %s", vertexId);
        }
        return vertices.build();
    }

    private ImmutableList<Element> getEdgesFromMessage(DataWorkerMessage message) {
        ImmutableList.Builder edges = ImmutableList.builder();
        for (String edgeId : message.getGraphEdgeId()) {
            Edge edge = message.getStatus() == ElementOrPropertyStatus.DELETION || message.getStatus() == ElementOrPropertyStatus.HIDDEN ? this.graph.getEdge(edgeId, FetchHints.ALL, message.getBeforeActionTimestamp(), this.authorizations) : this.graph.getEdge(edgeId, FetchHints.ALL, this.authorizations);
            if (this.doesExist(edge)) {
                edges.add((Object)edge);
                continue;
            }
            LOGGER.warn("Could not find edge with id %s", edgeId);
        }
        return edges.build();
    }

    private boolean doesExist(Element element) {
        return element != null;
    }

    private void safeExecuteHandlePropertiesOnElements(DataWorkerItem workerItem) throws Exception {
        DataWorkerMessage message = workerItem.getMessage();
        for (Element element : workerItem.getElements()) {
            for (DataWorkerMessage.Property propertyMessage : message.getProperties()) {
                Property property = null;
                String propertyKey = propertyMessage.getPropertyKey();
                String propertyName = propertyMessage.getPropertyName();
                if ((StringUtils.isNotEmpty((String)propertyKey) || StringUtils.isNotEmpty((String)propertyName)) && (property = propertyKey == null ? element.getProperty(propertyName) : element.getProperty(propertyKey, propertyName)) == null) {
                    LOGGER.debug("Could not find property [%s]:[%s] on vertex with id %s", propertyKey, propertyName, element.getId());
                    continue;
                }
                this.safeExecuteHandlePropertyOnElement(element, property, message.getWorkspaceId(), message.getVisibilitySource(), message.getPriority(), message.isTraceEnabled(), propertyMessage.getStatus(), propertyMessage.getBeforeActionTimestampOrDefault());
            }
        }
    }

    private void safeExecuteHandlePropertyOnElements(DataWorkerItem workerItem) throws Exception {
        DataWorkerMessage message = workerItem.getMessage();
        for (Element element : workerItem.getElements()) {
            Property property = this.getProperty(element, message);
            if (property != null) {
                this.safeExecuteHandlePropertyOnElement(element, property, message);
                continue;
            }
            LOGGER.debug("Could not find property [%s]:[%s] on vertex with id %s", message.getPropertyKey(), message.getPropertyName(), element.getId());
        }
    }

    private Property getProperty(Element element, DataWorkerMessage message) {
        if (message.getPropertyName() == null) {
            return null;
        }
        Iterable<Property> properties = message.getPropertyKey() == null ? element.getProperties(message.getPropertyName()) : element.getProperties(message.getPropertyKey(), message.getPropertyName());
        Property result = null;
        for (Property property : properties) {
            if (message.getWorkspaceId() != null && property.getVisibility().hasAuthorization(message.getWorkspaceId())) {
                result = property;
                continue;
            }
            if (result != null) continue;
            result = property;
        }
        return result;
    }

    private void safeExecuteHandlePropertyOnElement(Element element, Property property, DataWorkerMessage message) throws Exception {
        this.safeExecuteHandlePropertyOnElement(element, property, message.getWorkspaceId(), message.getVisibilitySource(), message.getPriority(), message.isTraceEnabled(), message.getStatus(), message.getBeforeActionTimestampOrDefault());
    }

    private void safeExecuteHandlePropertyOnElement(Element element, Property property, String workspaceId, String visibilitySource, Priority priority, boolean traceEnabled, ElementOrPropertyStatus status, long beforeActionTimestamp) throws Exception {
        String propertyText = this.getPropertyText(property);
        List<DataWorkerThreadedWrapper> interestedWorkerWrappers = this.findInterestedWorkers(element, property, status);
        if (interestedWorkerWrappers.size() == 0) {
            LOGGER.debug("Could not find interested workers for %s %s property %s (%s)", new Object[]{element instanceof Vertex ? "vertex" : "edge", element.getId(), propertyText, status});
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            for (DataWorkerThreadedWrapper interestedWorkerWrapper : interestedWorkerWrappers) {
                LOGGER.debug("interested worker for %s %s property %s: %s (%s)", new Object[]{element instanceof Vertex ? "vertex" : "edge", element.getId(), propertyText, interestedWorkerWrapper.getWorker().getClass().getName(), status});
            }
        }
        DataWorkerData workData = new DataWorkerData(this.visibilityTranslator, element, property, workspaceId, visibilitySource, priority, traceEnabled, beforeActionTimestamp, status);
        LOGGER.debug("Begin work on element %s property %s", element.getId(), propertyText);
        if (property != null && property.getValue() instanceof StreamingPropertyValue) {
            StreamingPropertyValue spb = (StreamingPropertyValue)property.getValue();
            this.safeExecuteStreamingPropertyValue(interestedWorkerWrappers, workData, spb);
        } else {
            this.safeExecuteNonStreamingProperty(interestedWorkerWrappers, workData);
        }
        this.lastProcessedPropertyTime.set(System.currentTimeMillis());
        this.graph.flush();
        LOGGER.debug("Completed work on %s", propertyText);
    }

    private String getPropertyText(Property property) {
        return property == null ? "[none]" : property.getKey() + ":" + property.getName();
    }

    private void safeExecuteNonStreamingProperty(List<DataWorkerThreadedWrapper> interestedWorkerWrappers, DataWorkerData workData) throws Exception {
        for (DataWorkerThreadedWrapper interestedWorkerWrapper1 : interestedWorkerWrappers) {
            interestedWorkerWrapper1.enqueueWork(null, workData);
        }
        for (DataWorkerThreadedWrapper interestedWorkerWrapper : interestedWorkerWrappers) {
            interestedWorkerWrapper.dequeueResult(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void safeExecuteStreamingPropertyValue(List<DataWorkerThreadedWrapper> interestedWorkerWrappers, DataWorkerData workData, StreamingPropertyValue streamingPropertyValue) throws Exception {
        InputStream in;
        block6: {
            String[] workerNames = this.dataWorkerThreadedWrapperToNames(interestedWorkerWrappers);
            in = streamingPropertyValue.getInputStream();
            File tempFile = null;
            try {
                boolean requiresLocalFile = this.isLocalFileRequired(interestedWorkerWrappers);
                if (requiresLocalFile) {
                    tempFile = this.copyToTempFile(in, workData);
                    in = new FileInputStream(tempFile);
                }
                TeeInputStream teeInputStream = new TeeInputStream(in, workerNames);
                for (int i = 0; i < interestedWorkerWrappers.size(); ++i) {
                    interestedWorkerWrappers.get(i).enqueueWork(teeInputStream.getTees()[i], workData);
                }
                teeInputStream.loopUntilTeesAreClosed();
                for (DataWorkerThreadedWrapper interestedWorkerWrapper : interestedWorkerWrappers) {
                    interestedWorkerWrapper.dequeueResult(false);
                }
                if (tempFile == null || tempFile.delete()) break block6;
            }
            catch (Throwable throwable) {
                if (tempFile != null && !tempFile.delete()) {
                    LOGGER.warn("Could not delete temp file %s", tempFile.getAbsolutePath());
                }
                in.close();
                throw throwable;
            }
            LOGGER.warn("Could not delete temp file %s", tempFile.getAbsolutePath());
        }
        in.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private File copyToTempFile(InputStream in, DataWorkerData workData) throws IOException {
        String fileExt = null;
        String fileName = (String)BcSchema.FILE_NAME.getOnlyPropertyValue(workData.getElement());
        if (fileName != null) {
            fileExt = FilenameUtils.getExtension((String)fileName);
        }
        if (fileExt == null) {
            fileExt = "data";
        }
        File tempFile = File.createTempFile("dataWorkerBolt", fileExt);
        workData.setLocalFile(tempFile);
        try (FileOutputStream tempFileOut = new FileOutputStream(tempFile);){
            IOUtils.copy((InputStream)in, (OutputStream)tempFileOut);
        }
        finally {
            in.close();
        }
        return tempFile;
    }

    private boolean isLocalFileRequired(List<DataWorkerThreadedWrapper> interestedWorkerWrappers) {
        for (DataWorkerThreadedWrapper worker : interestedWorkerWrappers) {
            if (!worker.getWorker().isLocalFileRequired()) continue;
            return true;
        }
        return false;
    }

    private List<DataWorkerThreadedWrapper> findInterestedWorkers(Element element, Property property, ElementOrPropertyStatus status) {
        Set dataWorkerWhiteList = IterableUtils.toSet(BcSchema.DATA_WORKER_WHITE_LIST.getPropertyValues(element));
        Set dataWorkerBlackList = IterableUtils.toSet(BcSchema.DATA_WORKER_BLACK_LIST.getPropertyValues(element));
        ArrayList<DataWorkerThreadedWrapper> interestedWorkers = new ArrayList<DataWorkerThreadedWrapper>();
        for (DataWorkerThreadedWrapper wrapper : this.workerWrappers) {
            String dataWorkerName = wrapper.getWorker().getClass().getName();
            if (dataWorkerWhiteList.size() > 0 && !dataWorkerWhiteList.contains(dataWorkerName) || dataWorkerBlackList.contains(dataWorkerName)) continue;
            DataWorker worker = wrapper.getWorker();
            if (status == ElementOrPropertyStatus.DELETION) {
                this.addDeletedWorkers(interestedWorkers, worker, wrapper, element, property);
                continue;
            }
            if (status == ElementOrPropertyStatus.HIDDEN) {
                this.addHiddenWorkers(interestedWorkers, worker, wrapper, element, property);
                continue;
            }
            if (status == ElementOrPropertyStatus.UNHIDDEN) {
                this.addUnhiddenWorkers(interestedWorkers, worker, wrapper, element, property);
                continue;
            }
            if (!worker.isHandled(element, property)) continue;
            interestedWorkers.add(wrapper);
        }
        return interestedWorkers;
    }

    private void addDeletedWorkers(List<DataWorkerThreadedWrapper> interestedWorkers, DataWorker worker, DataWorkerThreadedWrapper wrapper, Element element, Property property) {
        if (worker.isDeleteHandled(element, property)) {
            interestedWorkers.add(wrapper);
        }
    }

    private void addHiddenWorkers(List<DataWorkerThreadedWrapper> interestedWorkers, DataWorker worker, DataWorkerThreadedWrapper wrapper, Element element, Property property) {
        if (worker.isHiddenHandled(element, property)) {
            interestedWorkers.add(wrapper);
        }
    }

    private void addUnhiddenWorkers(List<DataWorkerThreadedWrapper> interestedWorkers, DataWorker worker, DataWorkerThreadedWrapper wrapper, Element element, Property property) {
        if (worker.isUnhiddenHandled(element, property)) {
            interestedWorkers.add(wrapper);
        }
    }

    private String[] dataWorkerThreadedWrapperToNames(List<DataWorkerThreadedWrapper> interestedWorkerWrappers) {
        String[] names = new String[interestedWorkerWrappers.size()];
        for (int i = 0; i < names.length; ++i) {
            names[i] = interestedWorkerWrappers.get(i).getWorker().getClass().getName();
        }
        return names;
    }

    private ImmutableList<Element> getElements(DataWorkerMessage message) {
        ImmutableList.Builder results = ImmutableList.builder();
        if (message.getGraphVertexId() != null && message.getGraphVertexId().length > 0) {
            results.addAll(this.getVerticesFromMessage(message));
        }
        if (message.getGraphEdgeId() != null && message.getGraphEdgeId().length > 0) {
            results.addAll(this.getEdgesFromMessage(message));
        }
        return results.build();
    }

    public void shutdown() {
        for (DataWorkerThreadedWrapper wrapper : this.workerWrappers) {
            wrapper.stop();
        }
        super.stop();
    }

    public UserRepository getUserRepository() {
        return this.userRepository;
    }

    @Inject
    public void setUserRepository(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @Inject
    public void setGraph(Graph graph) {
        this.graph = graph;
    }

    @Inject
    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }

    @Inject
    public void setVisibilityTranslator(VisibilityTranslator visibilityTranslator) {
        this.visibilityTranslator = visibilityTranslator;
    }

    public void setAuthorizations(Authorizations authorizations) {
        this.authorizations = authorizations;
    }

    public long getLastProcessedTime() {
        return this.lastProcessedPropertyTime.get();
    }

    public User getUser() {
        return this.user;
    }

    public void setUser(User user) {
        this.user = user;
    }

    public boolean isStarted() {
        return this.shouldRun();
    }

    public boolean canHandle(Element element, Property property, ElementOrPropertyStatus status) {
        if (!this.isStarted()) {
            return true;
        }
        for (DataWorker worker : this.getAllDataWorkers()) {
            try {
                if (status == ElementOrPropertyStatus.DELETION && worker.isDeleteHandled(element, property)) {
                    return true;
                }
                if (status == ElementOrPropertyStatus.HIDDEN && worker.isHiddenHandled(element, property)) {
                    return true;
                }
                if (status == ElementOrPropertyStatus.UNHIDDEN && worker.isUnhiddenHandled(element, property)) {
                    return true;
                }
                if (!worker.isHandled(element, property)) continue;
                return true;
            }
            catch (Throwable t) {
                LOGGER.warn("Error checking to see if workers will handle graph property message.  Queueing anyways in case there was just a local error", t);
                return true;
            }
        }
        if (property == null) {
            LOGGER.debug("No interested workers for %s so did not queue it", element.getId());
        } else {
            LOGGER.debug("No interested workers for %s %s %s so did not queue it", element.getId(), property.getKey(), property.getValue());
        }
        return false;
    }

    public boolean canHandle(Element element, String propertyKey, String propertyName, ElementOrPropertyStatus status) {
        if (!this.isStarted()) {
            return true;
        }
        Property property = element.getProperty(propertyKey, propertyName);
        return this.canHandle(element, property, status);
    }

    private Collection<DataWorker> getAllDataWorkers() {
        return Lists.newArrayList(this.dataWorkers);
    }

    public static List<StoppableRunnable> startThreaded(int threadCount, final User user) {
        ArrayList<StoppableRunnable> stoppables = new ArrayList<StoppableRunnable>();
        LOGGER.info("Starting DataWorkerRunners on %d threads", threadCount);
        for (int i = 0; i < threadCount; ++i) {
            StoppableRunnable stoppable = new StoppableRunnable(){
                private DataWorkerRunner dataWorkerRunner = null;

                @Override
                public void run() {
                    try {
                        this.dataWorkerRunner = InjectHelper.getInstance(DataWorkerRunner.class);
                        this.dataWorkerRunner.prepare(user);
                        this.dataWorkerRunner.run();
                    }
                    catch (Exception ex) {
                        LOGGER.error("Failed running DataWorkerRunner", ex);
                    }
                }

                @Override
                public void stop() {
                    try {
                        if (this.dataWorkerRunner != null) {
                            LOGGER.debug("Stopping DataWorkerRunner", new Object[0]);
                            this.dataWorkerRunner.stop();
                        }
                    }
                    catch (Exception ex) {
                        LOGGER.error("Failed stopping DataWorkerRunner", ex);
                    }
                }
            };
            stoppables.add(stoppable);
            Thread t = new Thread(stoppable);
            t.setName("data-worker-runner-" + t.getId());
            t.setDaemon(true);
            LOGGER.debug("Starting DataWorkerRunner thread: %s", t.getName());
            t.start();
        }
        return stoppables;
    }
}

