/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.messageq.localkafka;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.graylog2.plugin.Message;
import org.graylog2.shared.journal.LocalKafkaJournal;
import org.graylog2.shared.messageq.AbstractMessageQueueAcknowledger;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;

@Singleton
public class LocalKafkaMessageQueueAcknowledger
extends AbstractMessageQueueAcknowledger<Long> {
    private final LocalKafkaJournal kafkaJournal;

    @Inject
    public LocalKafkaMessageQueueAcknowledger(LocalKafkaJournal kafkaJournal, MessageQueueAcknowledger.Metrics metrics) {
        super(Long.class, metrics);
        this.kafkaJournal = kafkaJournal;
    }

    @Override
    public void acknowledge(List<Message> messages) {
        Optional<Long> max = messages.stream().map(Message::getMessageQueueId).filter(x$0 -> this.isValidMessageQueueId(x$0)).map(Long.class::cast).max(Long::compare);
        max.ifPresent(this::doAcknowledge);
        this.metrics.acknowledgedMessages().mark((long)messages.size());
    }

    @Override
    protected void doAcknowledge(Long queueId) {
        this.kafkaJournal.markJournalOffsetCommitted(queueId);
    }
}

