/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordProcessorFacade {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
    private final KafkaConsumer camelKafkaConsumer;
    private final Map<String, Long> lastProcessedOffset;
    private final String threadId;
    private final KafkaRecordProcessor kafkaRecordProcessor;
    private final CommitManager commitManager;

    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, Map<String, Long> lastProcessedOffset, String threadId, CommitManager commitManager) {
        this.camelKafkaConsumer = camelKafkaConsumer;
        this.lastProcessedOffset = lastProcessedOffset;
        this.threadId = threadId;
        this.commitManager = commitManager;
        this.kafkaRecordProcessor = this.buildKafkaRecordProcessor(commitManager);
    }

    private boolean isStopping() {
        return this.camelKafkaConsumer.isStopping();
    }

    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
        this.logRecords(allRecords);
        Set partitions = allRecords.partitions();
        Iterator partitionIterator = partitions.iterator();
        ProcessingResult lastResult = ProcessingResult.newUnprocessed();
        while (partitionIterator.hasNext() && !this.isStopping()) {
            lastResult = ProcessingResult.newUnprocessed();
            TopicPartition partition = (TopicPartition)partitionIterator.next();
            List partitionRecords = allRecords.records(partition);
            Iterator recordIterator = partitionRecords.iterator();
            this.logRecordsInPartition(partitionRecords, partition);
            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !this.isStopping()) {
                ConsumerRecord record = (ConsumerRecord)recordIterator.next();
                lastResult = this.processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, this.kafkaRecordProcessor, (ConsumerRecord<Object, Object>)record);
            }
            if (lastResult.isBreakOnErrorHit()) continue;
            LOG.debug("Committing offset on successful execution");
            this.commitManager.commitOffset(partition, lastResult.getPartitionLastOffset());
        }
        return lastResult;
    }

    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Records count {} received for partition {}", (Object)partitionRecords.size(), (Object)partition);
        }
    }

    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last poll on thread {} resulted on {} records to process", (Object)this.threadId, (Object)allRecords.count());
        }
    }

    private ProcessingResult processRecord(TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, ProcessingResult lastResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> record) {
        this.logRecord(record);
        Exchange exchange = this.camelKafkaConsumer.createExchange(false);
        ProcessingResult currentResult = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, recordHasNext, record, lastResult, this.camelKafkaConsumer.getExceptionHandler());
        if (!currentResult.isBreakOnErrorHit()) {
            this.lastProcessedOffset.put(KafkaRecordProcessor.serializeOffsetKey(partition), currentResult.getPartitionLastOffset());
        }
        this.camelKafkaConsumer.releaseExchange(exchange, false);
        return currentResult;
    }

    private void logRecord(ConsumerRecord<Object, Object> record) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{record.partition(), record.offset(), record.key(), record.value()});
        }
    }

    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
        return new KafkaRecordProcessor(this.camelKafkaConsumer.getEndpoint().getConfiguration(), this.camelKafkaConsumer.getProcessor(), commitManager);
    }
}

