/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaAsyncManualCommit;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.camel.component.kafka.PollOnError;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaFetchRecords
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
    private final KafkaConsumer kafkaConsumer;
    private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
    private final String topicName;
    private final Pattern topicPattern;
    private final String threadId;
    private final Properties kafkaProps;
    private final Map<String, Long> lastProcessedOffset = new HashMap<String, Long>();
    private final PollExceptionStrategy pollExceptionStrategy;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private final ReentrantLock lock = new ReentrantLock();
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = new ConcurrentLinkedQueue();
    private boolean retry = true;
    private boolean reconnect = true;

    KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy pollExceptionStrategy, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, Properties kafkaProps) {
        this.kafkaConsumer = kafkaConsumer;
        this.pollExceptionStrategy = pollExceptionStrategy;
        this.bridge = bridge;
        this.topicName = topicName;
        this.topicPattern = topicPattern;
        this.threadId = topicName + "-Thread " + id;
        this.kafkaProps = kafkaProps;
    }

    void preInit() {
        this.createConsumer();
    }

    @Override
    public void run() {
        if (!this.isKafkaConsumerRunnable()) {
            return;
        }
        if (this.isRetrying() || this.isReconnecting()) {
            try {
                if (this.isReconnecting()) {
                    this.createConsumer();
                }
            }
            catch (Exception e) {
                LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", (Object)e.getMessage(), (Object)e);
            }
            long delay = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            String prefix = this.isReconnecting() ? "Reconnecting" : "Retrying";
            LOG.info("{} {} to topic {} after {} ms", new Object[]{prefix, this.threadId, this.topicName, delay});
            this.doRun();
        }
        LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", (Object)this.threadId, (Object)this.topicName);
        this.safeUnsubscribe();
        IOHelper.close((Closeable)this.consumer);
    }

    protected void createConsumer() {
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
            this.consumer = this.kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(this.kafkaProps);
        }
        finally {
            Thread.currentThread().setContextClassLoader(threadClassLoader);
        }
    }

    protected void doRun() {
        if (this.isReconnecting()) {
            this.subscribe();
            this.setReconnect(false);
            this.setRetry(true);
        }
        this.startPolling();
    }

    private void subscribe() {
        PartitionAssignmentListener listener = new PartitionAssignmentListener(this.threadId, this.topicName, this.kafkaConsumer.getEndpoint().getConfiguration(), this.consumer, this.lastProcessedOffset, this::isRunnable);
        if (this.topicPattern != null) {
            LOG.info("Subscribing {} to topic pattern {}", (Object)this.threadId, (Object)this.topicName);
            this.consumer.subscribe(this.topicPattern, (ConsumerRebalanceListener)listener);
        } else {
            LOG.info("Subscribing {} to topic {}", (Object)this.threadId, (Object)this.topicName);
            this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), (ConsumerRebalanceListener)listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startPolling() {
        long partitionLastOffset = -1L;
        try {
            this.lock.lock();
            long pollTimeoutMs = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            LOG.trace("Polling {} from topic: {} with timeout: {}", new Object[]{this.threadId, this.topicName, pollTimeoutMs});
            KafkaRecordProcessor kafkaRecordProcessor = this.buildKafkaRecordProcessor();
            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
            while (this.isKafkaConsumerRunnable() && this.isRetrying() && !this.isReconnecting()) {
                ConsumerRecords allRecords = this.consumer.poll(pollDuration);
                this.processAsyncCommits();
                partitionLastOffset = this.processPolledRecords((ConsumerRecords<Object, Object>)allRecords, kafkaRecordProcessor);
            }
            if (!this.isReconnecting()) {
                LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                this.commit();
            }
            this.safeUnsubscribe();
        }
        catch (InterruptException e) {
            this.kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", (Throwable)e);
            this.commit();
            LOG.info("Unsubscribing {} from topic {}", (Object)this.threadId, (Object)this.topicName);
            this.safeUnsubscribe();
            Thread.currentThread().interrupt();
        }
        catch (WakeupException e) {
            LOG.trace("The kafka consumer was woken up while polling on thread {} for topic {}", (Object)this.threadId, (Object)this.topicName);
            this.safeUnsubscribe();
        }
        catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", new Object[]{e.getClass().getName(), this.threadId, this.topicName, this.lastProcessedOffset, e.getMessage(), e});
            } else {
                LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", new Object[]{e.getClass().getName(), this.threadId, this.topicName, this.lastProcessedOffset, e.getMessage()});
            }
            this.handleAccordingToStrategy(partitionLastOffset, e);
        }
        finally {
            this.lock.unlock();
            if (!this.isRetrying()) {
                LOG.debug("Closing consumer {}", (Object)this.threadId);
                IOHelper.close((Closeable)this.consumer);
            }
        }
    }

    private void processAsyncCommits() {
        while (!this.asyncCommits.isEmpty()) {
            this.asyncCommits.poll().processAsyncCommit();
        }
    }

    private void handleAccordingToStrategy(long partitionLastOffset, Exception e) {
        PollOnError onError = this.pollExceptionStrategy.handleException(e);
        if (PollOnError.RETRY == onError) {
            this.handlePollRetry();
        } else if (PollOnError.RECONNECT == onError) {
            this.handlePollReconnect();
        } else if (PollOnError.ERROR_HANDLER == onError) {
            this.handlePollErrorHandler(partitionLastOffset, e);
        } else if (PollOnError.DISCARD == onError) {
            this.handlePollDiscard(partitionLastOffset);
        } else if (PollOnError.STOP == onError) {
            this.handlePollStop();
        }
    }

    private void safeUnsubscribe() {
        try {
            this.consumer.unsubscribe();
        }
        catch (Exception e) {
            this.kafkaConsumer.getExceptionHandler().handleException("Error unsubscribing " + this.threadId + " from kafka topic " + this.topicName, (Throwable)e);
        }
    }

    private void commit() {
        this.processAsyncCommits();
        if (this.isAutoCommitEnabled()) {
            if ("async".equals(this.kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
                LOG.info("Auto commitAsync on stop {} from topic {}", (Object)this.threadId, (Object)this.topicName);
                this.consumer.commitAsync();
            } else if ("sync".equals(this.kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
                LOG.info("Auto commitSync on stop {} from topic {}", (Object)this.threadId, (Object)this.topicName);
                this.consumer.commitSync();
            } else if ("none".equals(this.kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
                LOG.info("Auto commit on stop {} from topic {} is disabled (none)", (Object)this.threadId, (Object)this.topicName);
            }
        }
    }

    private void handlePollStop() {
        LOG.warn("Requesting the consumer to stop based on polling exception strategy");
        this.setRetry(false);
        this.setReconnect(false);
    }

    private void handlePollDiscard(long partitionLastOffset) {
        LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
        this.seekToNextOffset(partitionLastOffset);
    }

    private void handlePollErrorHandler(long partitionLastOffset, Exception e) {
        LOG.warn("Deferring processing to the exception handler based on polling exception strategy");
        this.bridge.handleException((Throwable)e);
        this.seekToNextOffset(partitionLastOffset);
    }

    private void handlePollReconnect() {
        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
        this.setReconnect(true);
        this.setRetry(false);
    }

    private void handlePollRetry() {
        LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy");
        this.setRetry(true);
    }

    private boolean isKafkaConsumerRunnable() {
        return this.kafkaConsumer.isRunAllowed() && !this.kafkaConsumer.isStoppingOrStopped() && !this.kafkaConsumer.isSuspendingOrSuspended();
    }

    private boolean isRunnable() {
        return this.kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !this.kafkaConsumer.isRunAllowed();
    }

    private long processPolledRecords(ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor kafkaRecordProcessor) {
        this.logRecords(allRecords);
        Set partitions = allRecords.partitions();
        Iterator partitionIterator = partitions.iterator();
        KafkaRecordProcessor.ProcessResult lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
        while (partitionIterator.hasNext() && !this.isStopping()) {
            lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
            TopicPartition partition = (TopicPartition)partitionIterator.next();
            List partitionRecords = allRecords.records(partition);
            Iterator recordIterator = partitionRecords.iterator();
            this.logRecordsInPartition(partitionRecords, partition);
            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !this.isStopping()) {
                ConsumerRecord record = (ConsumerRecord)recordIterator.next();
                lastResult = this.processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, (ConsumerRecord<Object, Object>)record);
            }
            if (lastResult.isBreakOnErrorHit()) continue;
            LOG.debug("Committing offset on successful execution");
            kafkaRecordProcessor.commitOffset(partition, lastResult.getPartitionLastOffset(), false, false);
        }
        if (lastResult.isBreakOnErrorHit()) {
            LOG.debug("We hit an error ... setting flags to force reconnect");
            this.setReconnect(true);
            this.setRetry(false);
        }
        return lastResult.getPartitionLastOffset();
    }

    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Records count {} received for partition {}", (Object)partitionRecords.size(), (Object)partition);
        }
    }

    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last poll on thread {} resulted on {} records to process", (Object)this.threadId, (Object)allRecords.count());
        }
    }

    private KafkaRecordProcessor.ProcessResult processRecord(TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, KafkaRecordProcessor.ProcessResult lastResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> record) {
        this.logRecord(record);
        Exchange exchange = this.kafkaConsumer.createExchange(false);
        KafkaRecordProcessor.ProcessResult currentResult = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, recordHasNext, record, lastResult, this.kafkaConsumer.getExceptionHandler());
        if (!currentResult.isBreakOnErrorHit()) {
            this.lastProcessedOffset.put(KafkaRecordProcessor.serializeOffsetKey(partition), currentResult.getPartitionLastOffset());
        }
        this.kafkaConsumer.releaseExchange(exchange, false);
        return currentResult;
    }

    private void logRecord(ConsumerRecord<Object, Object> record) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{record.partition(), record.offset(), record.key(), record.value()});
        }
    }

    private KafkaRecordProcessor buildKafkaRecordProcessor() {
        return new KafkaRecordProcessor(this.isAutoCommitEnabled(), this.kafkaConsumer.getEndpoint().getConfiguration(), this.kafkaConsumer.getProcessor(), this.consumer, this.kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory(), this.threadId, this.asyncCommits);
    }

    private void seekToNextOffset(long partitionLastOffset) {
        block4: {
            Set tps;
            boolean logged;
            block3: {
                logged = false;
                tps = this.consumer.assignment();
                if (tps == null || partitionLastOffset == -1L) break block3;
                long next = partitionLastOffset + 1L;
                LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", (Object)next, (Object)this.topicName);
                for (TopicPartition tp : tps) {
                    this.consumer.seek(tp, next);
                }
                break block4;
            }
            if (tps == null) break block4;
            for (TopicPartition tp : tps) {
                long next = this.consumer.position(tp) + 1L;
                if (!logged) {
                    LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", (Object)next, (Object)this.topicName);
                    logged = true;
                }
                this.consumer.seek(tp, next);
            }
        }
    }

    private boolean isRetrying() {
        return this.retry;
    }

    private void setRetry(boolean value) {
        this.retry = value;
    }

    private boolean isReconnecting() {
        return this.reconnect;
    }

    private void setReconnect(boolean value) {
        this.reconnect = value;
    }

    private void setStopping(boolean value) {
        this.stopping.set(value);
    }

    private boolean isStopping() {
        return this.stopping.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void safeStop() {
        this.setStopping(true);
        long timeout = this.kafkaConsumer.getEndpoint().getConfiguration().getShutdownTimeout();
        try {
            LOG.info("Waiting up to {} milliseconds for the processing to finish", (Object)timeout);
            if (!this.lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
                LOG.warn("The processing of the current record did not finish within {} seconds", (Object)timeout);
            }
            this.consumer.wakeup();
        }
        catch (InterruptedException e) {
            this.consumer.wakeup();
            Thread.currentThread().interrupt();
        }
        finally {
            this.lock.unlock();
        }
    }

    void stop() {
        this.safeStop();
    }

    void shutdown() {
        this.safeStop();
    }

    private boolean isAutoCommitEnabled() {
        return this.kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable() != null && this.kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable() != false;
    }
}

