/*
 * Decompiled with CFR 0.152.
 */
package io.datanerds.verteiler;

import io.datanerds.verteiler.ConsumerRecordRelay;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Processor<K, V>
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Processor.class);
    private volatile boolean stopped = false;
    private final BlockingQueue<ConsumerRecord<K, V>> queue;
    private final ConsumerRecordRelay<K, V> relay;
    private final Consumer<V> action;
    private final TopicPartition topicPartition;

    Processor(TopicPartition topicPartition, ConsumerRecordRelay<K, V> relay, Consumer<V> action, int queueSize) {
        this.queue = new ArrayBlockingQueue<ConsumerRecord<K, V>>(queueSize);
        this.relay = relay;
        this.action = action;
        this.topicPartition = topicPartition;
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.topicPartition.toString());
        logger.info("Processor for {} started", (Object)this.topicPartition);
        try {
            while (!this.stopped) {
                ConsumerRecord<K, V> record = this.queue.take();
                this.action.accept(record.value());
                this.relay.setOffset(record);
            }
        }
        catch (InterruptedException ignored) {
            logger.debug("Processor for {} interrupted while waiting for messages", (Object)this.topicPartition);
        }
        catch (Exception ex) {
            logger.error("Exception during processing {}. Stopping!", (Object)this.topicPartition, (Object)ex);
        }
        this.stop();
        this.queue.clear();
        logger.info("Processor for {} stopped", (Object)this.topicPartition);
    }

    public void stop() {
        this.stopped = true;
    }

    public void queue(ConsumerRecord<K, V> record) throws InterruptedException {
        this.queue.put(record);
    }

    public boolean isStopped() {
        return this.stopped;
    }
}

