/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer.support;

import java.time.Duration;
import java.util.Collections;
import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaManualCommitFactory;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordProcessor {
    public static final long START_OFFSET = -1L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
    private final boolean autoCommitEnabled;
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final KafkaConsumer<?, ?> consumer;
    private final KafkaManualCommitFactory manualCommitFactory;
    private final String threadId;

    public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration configuration, Processor processor, KafkaConsumer<?, ?> consumer, KafkaManualCommitFactory manualCommitFactory, String threadId) {
        this.autoCommitEnabled = autoCommitEnabled;
        this.configuration = configuration;
        this.processor = processor;
        this.consumer = consumer;
        this.manualCommitFactory = manualCommitFactory;
        this.threadId = threadId;
    }

    private void setupExchangeMessage(Message message, ConsumerRecord record) {
        message.setHeader("kafka.PARTITION", (Object)record.partition());
        message.setHeader("kafka.TOPIC", (Object)record.topic());
        message.setHeader("kafka.OFFSET", (Object)record.offset());
        message.setHeader("kafka.HEADERS", (Object)record.headers());
        message.setHeader("kafka.TIMESTAMP", (Object)record.timestamp());
        message.setHeader("CamelMessageTimestamp", (Object)record.timestamp());
        if (record.key() != null) {
            message.setHeader("kafka.KEY", record.key());
        }
        message.setBody(record.value());
    }

    private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), (Object)header.value(), exchange);
    }

    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
        HeaderFilterStrategy headerFilterStrategy = this.configuration.getHeaderFilterStrategy();
        KafkaHeaderDeserializer headerDeserializer = this.configuration.getHeaderDeserializer();
        StreamSupport.stream(record.headers().spliterator(), false).filter(header -> this.shouldBeFiltered((Header)header, exchange, headerFilterStrategy)).forEach(header -> exchange.getIn().setHeader(header.key(), headerDeserializer.deserialize(header.key(), header.value())));
    }

    public ProcessResult processExchange(Exchange exchange, TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessResult lastResult, ExceptionHandler exceptionHandler) {
        Message message = exchange.getMessage();
        this.setupExchangeMessage(message, record);
        this.propagateHeaders(record, exchange);
        if (!this.autoCommitEnabled) {
            message.setHeader("kafka.LAST_RECORD_BEFORE_COMMIT", (Object)(!recordHasNext ? 1 : 0));
            message.setHeader("kafka.LAST_POLL_RECORD", (Object)(!recordHasNext && !partitionHasNext ? 1 : 0));
        }
        if (this.configuration.isAllowManualCommit()) {
            StateRepository<String, String> offsetRepository = this.configuration.getOffsetRepository();
            KafkaManualCommit manual = this.manualCommitFactory.newInstance(exchange, this.consumer, partition.topic(), this.threadId, offsetRepository, partition, record.offset(), this.configuration.getCommitTimeoutMs());
            message.setHeader("CamelKafkaManualCommit", (Object)manual);
            message.setHeader("kafka.LAST_POLL_RECORD", (Object)(!recordHasNext && !partitionHasNext ? 1 : 0));
        }
        try {
            this.processor.process(exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
            boolean breakOnErrorExit = this.processException(exchange, partition, lastResult.getPartitionLastOffset(), exceptionHandler);
            return new ProcessResult(breakOnErrorExit, lastResult.getPartitionLastOffset());
        }
        return new ProcessResult(false, record.offset());
    }

    private boolean processException(Exchange exchange, TopicPartition partition, long partitionLastOffset, ExceptionHandler exceptionHandler) {
        if (this.configuration.isBreakOnFirstError()) {
            LOG.warn("Error during processing {} from topic: {}", new Object[]{exchange, partition.topic(), exchange.getException()});
            LOG.warn("Will seek consumer to offset {} and start polling again.", (Object)partitionLastOffset);
            this.commitOffset(partition, partitionLastOffset, false, true);
            return true;
        }
        exceptionHandler.handleException("Error during processing", exchange, (Throwable)exchange.getException());
        return false;
    }

    public void commitOffset(TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit) {
        KafkaRecordProcessor.commitOffset(this.configuration, this.consumer, partition, partitionLastOffset, stopping, forceCommit, this.threadId);
    }

    public static void commitOffset(KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit, String threadId) {
        if (partitionLastOffset == -1L) {
            return;
        }
        StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
            LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", new Object[]{threadId, partition.topic(), partition.partition(), partitionLastOffset});
            offsetRepository.setState((Object)KafkaRecordProcessor.serializeOffsetKey(partition), (Object)KafkaRecordProcessor.serializeOffsetValue(partitionLastOffset));
        } else if (stopping) {
            if ("async".equals(configuration.getAutoCommitOnStop())) {
                LOG.debug("Auto commitAsync on stop {} from topic {}", (Object)threadId, (Object)partition.topic());
                consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1L)), null);
            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
                LOG.debug("Auto commitSync on stop {} from topic {}", (Object)threadId, (Object)partition.topic());
                KafkaRecordProcessor.commitOffset(configuration, consumer, partition, partitionLastOffset);
            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
                LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", (Object)threadId, (Object)partition.topic());
            }
        } else if (forceCommit) {
            LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", new Object[]{threadId, partition.topic(), partition.partition(), partitionLastOffset});
            KafkaRecordProcessor.commitOffset(configuration, consumer, partition, partitionLastOffset);
        }
    }

    private static void commitOffset(KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) {
        long timeout = configuration.getCommitTimeoutMs();
        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1L)), Duration.ofMillis(timeout));
    }

    public static String serializeOffsetKey(TopicPartition topicPartition) {
        return topicPartition.topic() + '/' + topicPartition.partition();
    }

    public static String serializeOffsetValue(long offset) {
        return String.valueOf(offset);
    }

    public static long deserializeOffsetValue(String offset) {
        return Long.parseLong(offset);
    }

    public static final class ProcessResult {
        private static final ProcessResult UNPROCESSED_RESULT = new ProcessResult(false, -1L);
        private boolean breakOnErrorHit;
        private long partitionLastOffset;

        private ProcessResult(boolean breakOnErrorHit, long partitionLastOffset) {
            this.breakOnErrorHit = breakOnErrorHit;
            this.partitionLastOffset = partitionLastOffset;
        }

        public boolean isBreakOnErrorHit() {
            return this.breakOnErrorHit;
        }

        public long getPartitionLastOffset() {
            return this.partitionLastOffset;
        }

        public static ProcessResult newUnprocessed() {
            return UNPROCESSED_RESULT;
        }
    }
}

