/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.messageq.localkafka;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.graylog2.plugin.Message;
import org.graylog2.shared.journal.LocalKafkaJournal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class LocalKafkaMessageQueueAcknowledger
implements MessageQueueAcknowledger {
    private static final Logger LOG = LoggerFactory.getLogger(LocalKafkaMessageQueueAcknowledger.class);
    private LocalKafkaJournal kafkaJournal;
    private final MessageQueueAcknowledger.Metrics metrics;

    @Inject
    public LocalKafkaMessageQueueAcknowledger(LocalKafkaJournal kafkaJournal, MessageQueueAcknowledger.Metrics metrics) {
        this.kafkaJournal = kafkaJournal;
        this.metrics = metrics;
    }

    @Override
    public void acknowledge(Object offset) {
        this.doAcknowledge(offset);
        this.metrics.acknowledgedMessages().mark();
    }

    @Override
    public void acknowledge(Message message) {
        this.doAcknowledge(message.getMessageQueueId());
        this.metrics.acknowledgedMessages().mark();
    }

    @Override
    public void acknowledge(List<Message> messages) {
        Optional<Long> max = messages.stream().map(Message::getMessageQueueId).filter(Long.class::isInstance).map(Long.class::cast).max(Long::compare);
        max.ifPresent(this::doAcknowledge);
        this.metrics.acknowledgedMessages().mark((long)messages.size());
    }

    private void doAcknowledge(Object object) {
        if (!(object instanceof Long)) {
            LOG.error("Couldn't acknowledge message. Expected <" + object + "> to be of type Long");
            return;
        }
        long offset = (Long)object;
        this.kafkaJournal.markJournalOffsetCommitted(offset);
    }
}

