/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.kafka.eventhandling.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.Assert;
import org.axonframework.common.ObjectUtils;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.consumer.Buffer;
import org.axonframework.kafka.eventhandling.consumer.KafkaEventMessage;
import org.axonframework.kafka.eventhandling.consumer.KafkaTrackingToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FetchEventsTask<K, V>
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FetchEventsTask.class);
    private final Consumer<K, V> consumer;
    private final Buffer<KafkaEventMessage> buffer;
    private final KafkaMessageConverter<K, V> converter;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
    private final long timeout;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final java.util.function.Consumer<FetchEventsTask> closeHandler;
    private KafkaTrackingToken currentToken;

    public FetchEventsTask(Consumer<K, V> consumer, KafkaTrackingToken token, Buffer<KafkaEventMessage> buffer, KafkaMessageConverter<K, V> converter, BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback, long timeout, java.util.function.Consumer<FetchEventsTask> closeHandler) {
        Assert.isFalse((timeout < 0L ? 1 : 0) != 0, () -> "Timeout may not be < 0");
        this.consumer = (Consumer)Assert.nonNull(consumer, () -> "Consumer may not be null");
        this.currentToken = (KafkaTrackingToken)Assert.nonNull((Object)token, () -> "Token may not be null");
        this.buffer = (Buffer)Assert.nonNull(buffer, () -> "Buffer may not be null");
        this.converter = (KafkaMessageConverter)Assert.nonNull(converter, () -> "Converter may not be null");
        this.callback = (BiFunction)Assert.nonNull(callback, () -> "Callback may not be null");
        this.timeout = timeout;
        this.closeHandler = (java.util.function.Consumer)ObjectUtils.getOrDefault(closeHandler, x -> {});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            block7: while (this.running.get()) {
                ConsumerRecords records = this.consumer.poll(this.timeout);
                if (logger.isDebugEnabled()) {
                    logger.debug("Fetched {} records", (Object)records.count());
                }
                ArrayList messages = new ArrayList(records.count());
                ArrayList callbacks = new ArrayList(records.count());
                for (ConsumerRecord record : records) {
                    this.converter.readKafkaMessage(record).ifPresent(eventMessage -> {
                        KafkaTrackingToken nextToken = this.currentToken.advancedTo(record.partition(), record.offset());
                        if (logger.isDebugEnabled()) {
                            logger.debug("Updating token from {} -> {}", (Object)this.currentToken, (Object)nextToken);
                        }
                        this.currentToken = nextToken;
                        messages.add(KafkaEventMessage.from(eventMessage, record, this.currentToken));
                        callbacks.add(new CallbackEntry(this.currentToken, record));
                    });
                }
                try {
                    this.buffer.putAll(messages);
                    Iterator iterator = callbacks.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block7;
                        CallbackEntry c = (CallbackEntry)iterator.next();
                        this.callback.apply(c.record, c.token);
                    }
                }
                catch (InterruptedException e) {
                    this.running.set(false);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Event producer thread was interrupted. Shutting down.", (Throwable)e);
                    }
                    Thread.currentThread().interrupt();
                }
            }
            return;
        }
        catch (Exception e) {
            logger.error("Cannot proceed with Fetching, encountered {} ", (Throwable)e);
            return;
        }
        finally {
            this.running.set(false);
            this.closeHandler.accept(this);
            this.consumer.close();
        }
    }

    public void close() {
        this.running.set(false);
    }

    private static class CallbackEntry<K, V> {
        private final KafkaTrackingToken token;
        private final ConsumerRecord<K, V> record;

        public CallbackEntry(KafkaTrackingToken token, ConsumerRecord<K, V> record) {
            this.token = token;
            this.record = record;
        }
    }
}

