/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.resume.kafka;

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiNodeKafkaResumeStrategy<K, V>
extends SingleNodeKafkaResumeStrategy<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
    private final ExecutorService executorService;

    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter) {
        this(bootstrapServers, topic, resumeCache, resumeAdapter, Executors.newSingleThreadExecutor());
    }

    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter, ExecutorService executorService) {
        super(bootstrapServers, topic, resumeCache, resumeAdapter);
        this.executorService = executorService;
        executorService.submit(() -> this.refresh());
    }

    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter, Properties producerConfig, Properties consumerConfig) {
        this(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig, Executors.newSingleThreadExecutor());
    }

    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter, Properties producerConfig, Properties consumerConfig, ExecutorService executorService) {
        super(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig);
        this.executorService = executorService;
        executorService.submit(() -> this.refresh());
    }

    /*
     * Unable to fully structure code
     */
    protected void refresh() {
        MultiNodeKafkaResumeStrategy.LOG.trace("Creating a offset cache refresher");
        try {
            prop = (Properties)this.getConsumerConfig().clone();
            prop.setProperty("group.id", UUID.randomUUID().toString());
            consumer = new KafkaConsumer(prop);
            consumer.subscribe(Collections.singletonList(this.getTopic()));
            block2: while (true) {
                if ((records = consumer.poll(this.getPollDuration())).isEmpty()) {
                    continue;
                }
                var4_5 = records.iterator();
                while (true) {
                    if (var4_5.hasNext()) ** break;
                    continue block2;
                    record = (ConsumerRecord)var4_5.next();
                    value = record.value();
                    MultiNodeKafkaResumeStrategy.LOG.trace("Read from Kafka: {}", value);
                    this.getResumeCache().add(record.key(), record.value());
                }
                break;
            }
        }
        catch (Exception e) {
            MultiNodeKafkaResumeStrategy.LOG.error("Error while refreshing the local cache: {}", (Object)e.getMessage(), (Object)e);
            return;
        }
    }

    @Override
    public void stop() {
        try {
            this.executorService.shutdown();
        }
        finally {
            super.stop();
        }
    }
}

