/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.system.processing;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Named;
import org.bson.types.ObjectId;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.MongoDBUpsertRetryer;
import org.graylog2.plugin.BaseConfiguration;
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.system.processing.ProcessingStatusDto;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;

public class DBProcessingStatusService {
    public static final String COLLECTION_NAME = "processing_status";
    private static final String FIELD_WRITTEN_MESSAGES_1M = "input_journal.written_messages_1m_rate";
    private static final String FIELD_UNCOMMITTED_ENTRIES = "input_journal.uncommitted_entries";
    private final String nodeId;
    private final JobSchedulerClock clock;
    private final Duration updateThreshold;
    private final JacksonDBCollection<ProcessingStatusDto, ObjectId> db;
    private final BaseConfiguration baseConfiguration;

    @Inject
    public DBProcessingStatusService(MongoConnection mongoConnection, NodeId nodeId, JobSchedulerClock clock, @Named(value="processing_status_update_threshold") Duration updateThreshold, @Named(value="processing_status_journal_write_rate_threshold") int journalWriteRateThreshold, MongoJackObjectMapperProvider mapper, BaseConfiguration baseConfiguration) {
        this.nodeId = nodeId.getNodeId();
        this.clock = clock;
        this.updateThreshold = updateThreshold;
        this.baseConfiguration = baseConfiguration;
        this.db = JacksonDBCollection.wrap((DBCollection)mongoConnection.getDatabase().getCollection(COLLECTION_NAME), ProcessingStatusDto.class, ObjectId.class, (ObjectMapper)mapper.get());
        this.db.createIndex((DBObject)new BasicDBObject("node_id", (Object)1), (DBObject)new BasicDBObject("unique", (Object)true));
        String OLD_INDEX_NAME = "updated_at_1_input_journal.uncommitted_entries_1_input_journal.written_messages_1m_rate_1";
        try {
            if (this.db.getIndexInfo().stream().anyMatch(dbo -> dbo.get("name").equals("updated_at_1_input_journal.uncommitted_entries_1_input_journal.written_messages_1m_rate_1"))) {
                this.db.dropIndex("updated_at_1_input_journal.uncommitted_entries_1_input_journal.written_messages_1m_rate_1");
            }
        }
        catch (MongoException mongoException) {
            // empty catch block
        }
        this.db.createIndex((DBObject)new BasicDBObject("updated_at", (Object)1).append(FIELD_UNCOMMITTED_ENTRIES, (Object)1).append(FIELD_WRITTEN_MESSAGES_1M, (Object)1), (DBObject)new BasicDBObject("name", (Object)"compound_0"));
    }

    public List<ProcessingStatusDto> all() {
        return ImmutableList.copyOf((Iterator)this.db.find().sort((DBObject)DBSort.asc((String)"_id")).iterator());
    }

    public Optional<ProcessingStatusDto> get() {
        return Optional.ofNullable((ProcessingStatusDto)this.db.findOne(DBQuery.is((String)"node_id", (Object)this.nodeId)));
    }

    public ProcessingStatusDto save(ProcessingStatusRecorder processingStatusRecorder) {
        return this.save(processingStatusRecorder, DateTime.now((DateTimeZone)DateTimeZone.UTC));
    }

    @VisibleForTesting
    ProcessingStatusDto save(ProcessingStatusRecorder processingStatusRecorder, DateTime updatedAt) {
        return MongoDBUpsertRetryer.run(() -> (ProcessingStatusDto)this.db.findAndModify(DBQuery.is((String)"node_id", (Object)this.nodeId), null, null, false, (Object)ProcessingStatusDto.of(this.nodeId, processingStatusRecorder, updatedAt, this.baseConfiguration.isMessageJournalEnabled()), true, true));
    }

    public ProcessingNodesState calculateProcessingState(TimeRange timeRange) {
        DateTime updateThresholdTimestamp = this.clock.nowUTC().minus(this.updateThreshold.toMilliseconds());
        try (DBCursor statusCursor = this.db.find(this.activeNodes(updateThresholdTimestamp));){
            if (!statusCursor.hasNext()) {
                ProcessingNodesState processingNodesState = ProcessingNodesState.NONE_ACTIVE;
                return processingNodesState;
            }
            int activeNodes = 0;
            int idleNodes = 0;
            while (statusCursor.hasNext()) {
                ++activeNodes;
                ProcessingStatusDto nodeProcessingStatus = (ProcessingStatusDto)statusCursor.next();
                DateTime lastIndexedMessage = nodeProcessingStatus.receiveTimes().postIndexing();
                if (lastIndexedMessage.isBefore((ReadableInstant)timeRange.getTo()) && this.isBusy(nodeProcessingStatus)) {
                    ProcessingNodesState processingNodesState = ProcessingNodesState.SOME_OVERLOADED;
                    return processingNodesState;
                }
                if (!lastIndexedMessage.isBefore((ReadableInstant)timeRange.getFrom())) continue;
                ++idleNodes;
            }
            if (activeNodes == idleNodes) {
                ProcessingNodesState processingNodesState = ProcessingNodesState.ALL_IDLE;
                return processingNodesState;
            }
        }
        return ProcessingNodesState.SOME_UP_TO_DATE;
    }

    private boolean isBusy(ProcessingStatusDto nodeProcessingStatus) {
        return nodeProcessingStatus.inputJournal().uncommittedEntries() > 0L || nodeProcessingStatus.processBufferUsage() > 0L;
    }

    private DBQuery.Query activeNodes(DateTime updateThresholdTimestamp) {
        return DBQuery.greaterThan((String)"updated_at", (Object)updateThresholdTimestamp);
    }

    public static enum ProcessingNodesState {
        NONE_ACTIVE,
        ALL_IDLE,
        SOME_UP_TO_DATE,
        SOME_OVERLOADED;

    }
}

