/*
 * Decompiled with CFR 0.152.
 */
package io.datanerds.verteiler;

import com.google.common.base.Joiner;
import io.datanerds.verteiler.BlockingQueueConsumer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerRecordRelay<K, V>
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerRecordRelay.class);
    private static final long POLLING_TIMEOUT_MS = 5000L;
    private volatile boolean stopped = false;
    private volatile boolean updateOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final BlockingQueueConsumer<K, V> blockingQueueConsumer;
    private final Consumer<K, V> consumer;

    public ConsumerRecordRelay(Consumer<K, V> consumer, BlockingQueueConsumer<K, V> blockingQueueConsumer) {
        this.blockingQueueConsumer = blockingQueueConsumer;
        this.consumer = consumer;
    }

    @Override
    public void run() {
        while (!this.stopped) {
            ConsumerRecords records = this.consumer.poll(5000L);
            for (ConsumerRecord record : records) {
                try {
                    this.blockingQueueConsumer.relay(record);
                }
                catch (InterruptedException ignored) {
                    logger.info("Interrupted during relay");
                }
                catch (Exception ex) {
                    logger.error("Error while relaying messages from kafka to queue: {}", (Object)ex.getMessage(), (Object)ex);
                    this.blockingQueueConsumer.stop();
                }
            }
            this.commitOffsets();
        }
        this.consumer.close();
        logger.info("Kafka message relay stopped");
    }

    public void setOffset(ConsumerRecord<K, V> record) {
        this.offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
        this.updateOffsets = true;
    }

    public void removePartitionFromOffset(TopicPartition topicPartition) {
        this.offsets.remove(topicPartition);
    }

    private void commitOffsets() {
        if (this.updateOffsets) {
            this.consumer.commitAsync(this.offsets, this::callback);
            this.updateOffsets = false;
        }
    }

    void stop() {
        logger.info("Stopping Kafka message relay");
        this.stopped = true;
    }

    private void callback(Map<TopicPartition, OffsetAndMetadata> offset, Exception ex) {
        if (ex != null) {
            Joiner.MapJoiner mapJoiner = Joiner.on((char)',').withKeyValueSeparator("=");
            logger.error("Error during offset commit: '{}'", (Object)mapJoiner.join(offset), (Object)ex);
        }
    }
}

