/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.kafka.basic.consumer;

import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerMessageHandler;
import io.eventuate.messaging.kafka.basic.consumer.KafkaMessageProcessorFailedException;
import io.eventuate.messaging.kafka.basic.consumer.MessageConsumerBacklog;
import io.eventuate.messaging.kafka.basic.consumer.OffsetTracker;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageProcessor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final String subscriberId;
    private final EventuateKafkaConsumerMessageHandler handler;
    private final OffsetTracker offsetTracker = new OffsetTracker();
    private final BlockingQueue<ConsumerRecord<String, byte[]>> processedRecords = new LinkedBlockingQueue<ConsumerRecord<String, byte[]>>();
    private final AtomicReference<KafkaMessageProcessorFailedException> failed = new AtomicReference();
    private final Set<MessageConsumerBacklog> consumerBacklogs = new HashSet<MessageConsumerBacklog>();

    public KafkaMessageProcessor(String subscriberId, EventuateKafkaConsumerMessageHandler handler) {
        this.subscriberId = subscriberId;
        this.handler = handler;
    }

    public void process(ConsumerRecord<String, byte[]> record) {
        this.throwFailureException();
        this.offsetTracker.noteUnprocessed(new TopicPartition(record.topic(), record.partition()), record.offset());
        MessageConsumerBacklog consumerBacklog = (MessageConsumerBacklog)this.handler.apply(record, (result, t) -> this.handleMessagingHandlingOutcome(record, (Throwable)t));
        if (consumerBacklog != null) {
            this.consumerBacklogs.add(consumerBacklog);
        }
    }

    private void handleMessagingHandlingOutcome(ConsumerRecord<String, byte[]> record, Throwable t) {
        if (t != null) {
            this.logger.error("Got exception: ", t);
            this.failed.set(new KafkaMessageProcessorFailedException(t));
        } else {
            this.logger.debug("Adding processed record to queue {} {}", (Object)this.subscriberId, (Object)record.offset());
            this.processedRecords.add(record);
        }
    }

    void throwFailureException() {
        if (this.failed.get() != null) {
            throw this.failed.get();
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> offsetsToCommit() {
        ConsumerRecord record;
        int count = 0;
        while ((record = (ConsumerRecord)this.processedRecords.poll()) != null) {
            ++count;
            this.offsetTracker.noteProcessed(new TopicPartition(record.topic(), record.partition()), record.offset());
        }
        this.logger.trace("removed {} {} processed records from queue", (Object)this.subscriberId, (Object)count);
        return this.offsetTracker.offsetsToCommit();
    }

    public void noteOffsetsCommitted(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
        this.offsetTracker.noteOffsetsCommitted(offsetsToCommit);
    }

    public OffsetTracker getPending() {
        return this.offsetTracker;
    }

    public int backlog() {
        return this.consumerBacklogs.stream().mapToInt(MessageConsumerBacklog::size).sum();
    }
}

