/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support.batching;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.StopWatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KafkaRecordBatchingProcessor
extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class);
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;
    private final StopWatch timeoutWatch = new StopWatch();
    private final StopWatch intervalWatch = new StopWatch();
    private final Queue<Exchange> exchangeList;

    public KafkaRecordBatchingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
        this.configuration = configuration;
        this.processor = processor;
        this.commitManager = commitManager;
        this.exchangeList = new ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords());
    }

    public Exchange toExchange(KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord) {
        Exchange exchange = camelKafkaConsumer.createExchange(false);
        Message message = exchange.getMessage();
        this.setupExchangeMessage(message, consumerRecord);
        this.propagateHeaders(this.configuration, consumerRecord, exchange);
        if (this.configuration.isAllowManualCommit()) {
            KafkaManualCommit manual = this.commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
            message.setHeader("CamelKafkaManualCommit", (Object)manual);
        }
        return exchange;
    }

    public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
        LOG.debug("There's {} records to process ... max poll is set to {}", (Object)consumerRecords.count(), (Object)this.configuration.getMaxPollRecords());
        if (this.exchangeList.isEmpty()) {
            this.timeoutWatch.takenAndRestart();
        }
        if (this.hasExpiredRecords(consumerRecords)) {
            LOG.debug("The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", (Object)this.exchangeList.size());
            ProcessingResult result = this.processBatch(camelKafkaConsumer);
            this.exchangeList.clear();
            this.timeoutWatch.takenAndRestart();
            if (result.isBreakOnErrorHit()) {
                return result;
            }
        }
        for (ConsumerRecord consumerRecord : consumerRecords) {
            TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            Exchange childExchange = this.toExchange(camelKafkaConsumer, tp, (ConsumerRecord<Object, Object>)consumerRecord);
            this.exchangeList.add(childExchange);
            if (this.exchangeList.size() < this.configuration.getMaxPollRecords()) continue;
            ProcessingResult result = this.processBatch(camelKafkaConsumer);
            this.exchangeList.clear();
            this.timeoutWatch.takenAndRestart();
            if (!result.isBreakOnErrorHit()) continue;
            return result;
        }
        return ProcessingResult.newUnprocessed();
    }

    private boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords) {
        if (this.exchangeList.isEmpty()) {
            return false;
        }
        boolean timeout = consumerRecords.isEmpty() && this.timeoutWatch.taken() >= this.configuration.getPollTimeoutMs();
        boolean interval = this.configuration.getBatchingIntervalMs() != null && this.intervalWatch.taken() >= (long)this.configuration.getBatchingIntervalMs().intValue();
        return timeout || interval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) {
        this.intervalWatch.restart();
        Exchange exchange = camelKafkaConsumer.createExchange(false);
        Message message = exchange.getMessage();
        List<Exchange> exchanges = this.exchangeList.stream().toList();
        message.setBody(exchanges);
        ProcessingResult result = ProcessingResult.newUnprocessed();
        try {
            if (this.configuration.isAllowManualCommit()) {
                Exchange last;
                Exchange exchange2 = last = exchanges.isEmpty() ? null : exchanges.get(exchanges.size() - 1);
                if (last != null) {
                    message.setHeader("CamelKafkaManualCommit", last.getMessage().getHeader("CamelKafkaManualCommit"));
                }
                result = this.manualCommitResultProcessing(camelKafkaConsumer, exchange, exchanges);
            } else {
                result = this.autoCommitResultProcessing(camelKafkaConsumer, exchange, exchanges);
            }
        }
        finally {
            camelKafkaConsumer.releaseExchange(exchange, false);
        }
        return result;
    }

    private ProcessingResult autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange, List<Exchange> exchanges) {
        ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
        CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler, exchanges.size());
        exchange.getExchangeExtension().addOnCompletion((Synchronization)commitSynchronization);
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        if (exchange.getException() != null && this.configuration.isBreakOnFirstError()) {
            Exchange firstExchange;
            Exchange exchange2 = firstExchange = exchanges.isEmpty() ? null : exchanges.get(0);
            if (firstExchange != null) {
                Message message = firstExchange.getMessage();
                String topic = (String)message.getHeader("kafka.TOPIC");
                Integer partition = (Integer)message.getHeader("kafka.PARTITION");
                Long offset = (Long)message.getHeader("kafka.OFFSET");
                if (topic != null && partition != null && offset != null) {
                    return new ProcessingResult(true, true, topic, partition, offset);
                }
            }
            return new ProcessingResult(true, true);
        }
        return ProcessingResult.newUnprocessed();
    }

    private ProcessingResult manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange, List<Exchange> exchanges) {
        ExceptionHandler exceptionHandler;
        boolean breakOnErrorHit;
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        if (exchange.getException() != null && (breakOnErrorHit = this.processException(exchange, exceptionHandler = camelKafkaConsumer.getExceptionHandler()))) {
            Exchange firstExchange;
            Exchange exchange2 = firstExchange = exchanges.isEmpty() ? null : exchanges.get(0);
            if (firstExchange != null) {
                Message message = firstExchange.getMessage();
                String topic = (String)message.getHeader("kafka.TOPIC");
                Integer partition = (Integer)message.getHeader("kafka.PARTITION");
                Long offset = (Long)message.getHeader("kafka.OFFSET");
                if (topic != null && partition != null && offset != null) {
                    return new ProcessingResult(true, true, topic, partition, offset);
                }
            }
            return new ProcessingResult(true, true);
        }
        return ProcessingResult.newUnprocessed();
    }

    private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) {
        if (this.configuration.isBreakOnFirstError()) {
            if (LOG.isWarnEnabled()) {
                Exception exc = exchange.getException();
                LOG.warn("Error during processing {} from batch due to {}", (Object)exchange, (Object)exc.getMessage());
                LOG.warn("Will break on first error in batch processing mode as configured.");
            }
            this.commitManager.commit();
            return true;
        }
        exceptionHandler.handleException("Error during processing", exchange, (Throwable)exchange.getException());
        return false;
    }

    private final class CommitSynchronization
    implements Synchronization {
        private final ExceptionHandler exceptionHandler;
        private final int size;

        public CommitSynchronization(ExceptionHandler exceptionHandler, int size) {
            this.exceptionHandler = exceptionHandler;
            this.size = size;
        }

        public void onComplete(Exchange exchange) {
            LOG.debug("Calling commit on {} exchanges using {}", (Object)this.size, (Object)KafkaRecordBatchingProcessor.this.commitManager.getClass().getSimpleName());
            KafkaRecordBatchingProcessor.this.commitManager.commit();
        }

        public void onFailure(Exchange exchange) {
            Exception cause = exchange.getException();
            if (cause != null) {
                if (KafkaRecordBatchingProcessor.this.configuration.isBreakOnFirstError()) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error during processing {} from batch due to {}", (Object)exchange, (Object)cause.getMessage());
                        LOG.warn("Will break on first error in batch processing mode as configured.");
                    }
                    LOG.debug("Calling commit on {} exchanges using {} due to breakOnFirstError", (Object)this.size, (Object)KafkaRecordBatchingProcessor.this.commitManager.getClass().getSimpleName());
                    KafkaRecordBatchingProcessor.this.commitManager.commit();
                } else {
                    this.exceptionHandler.handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, (Throwable)cause);
                }
            } else {
                LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled");
            }
        }
    }
}

