/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.resume.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.camel.Resumable;
import org.apache.camel.ResumeCache;
import org.apache.camel.Service;
import org.apache.camel.UpdatableConsumerResumeStrategy;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaResumeStrategy<K, V>
implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>,
Service {
    public static final int UNLIMITED = -1;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
    private final String topic;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private long errorCount;
    private Duration pollDuration = Duration.ofSeconds(1L);
    private final List<Future<RecordMetadata>> sentItems = new ArrayList<Future<RecordMetadata>>();
    private final ResumeCache<K, V> resumeCache;
    private boolean subscribed;
    private Properties producerConfig;
    private Properties consumerConfig;

    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
        this.topic = topic;
        this.producerConfig = AbstractKafkaResumeStrategy.createProducer(bootstrapServers);
        this.consumerConfig = AbstractKafkaResumeStrategy.createConsumer(bootstrapServers);
        this.resumeCache = resumeCache;
        this.init();
    }

    public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig, Properties consumerConfig) {
        this.topic = topic;
        this.resumeCache = resumeCache;
        this.producerConfig = producerConfig;
        this.consumerConfig = consumerConfig;
        this.init();
    }

    public static Properties createProducer(String bootstrapServers) {
        Properties config = new Properties();
        config.put("key.serializer", StringSerializer.class.getName());
        config.put("value.serializer", StringSerializer.class.getName());
        StringHelper.notEmpty((String)bootstrapServers, (String)"bootstrapServers");
        config.put("bootstrap.servers", bootstrapServers);
        return config;
    }

    public static Properties createConsumer(String bootstrapServers) {
        Properties config = new Properties();
        config.put("key.deserializer", StringDeserializer.class.getName());
        config.put("value.deserializer", StringDeserializer.class.getName());
        StringHelper.notEmpty((String)bootstrapServers, (String)"bootstrapServers");
        config.put("bootstrap.servers", bootstrapServers);
        String groupId = UUID.randomUUID().toString();
        LOG.debug("Creating consumer with {}[{}]", (Object)"group.id", (Object)groupId);
        config.put("group.id", groupId);
        config.put("enable.auto.commit", Boolean.TRUE.toString());
        return config;
    }

    public void produce(K key, V message) throws ExecutionException, InterruptedException {
        ProducerRecord record = new ProducerRecord(this.topic, key, message);
        this.errorCount = 0L;
        Future future = this.producer.send(record, (recordMetadata, e) -> {
            if (e != null) {
                LOG.error("Failed to send message {}", (Object)e.getMessage(), (Object)e);
                ++this.errorCount;
            }
        });
        this.sentItems.add(future);
    }

    public void updateLastOffset(Resumable<K, V> offset) throws Exception {
        Object key = offset.getAddressable();
        Object offsetValue = offset.getLastOffset().offset();
        LOG.debug("Updating offset on Kafka with key {} to {}", key, offsetValue);
        this.produce(key, offsetValue);
        this.resumeCache.add(key, offsetValue);
    }

    protected void loadCache() throws Exception {
        ConsumerRecords<K, V> records;
        this.subscribe();
        LOG.debug("Loading records from topic {}", (Object)this.topic);
        block0: while (!(records = this.consume()).isEmpty()) {
            for (ConsumerRecord record : records) {
                Object value = record.value();
                LOG.trace("Read from Kafka: {}", value);
                this.resumeCache.add(record.key(), record.value());
                if (!this.resumeCache.isFull()) continue;
                continue block0;
            }
        }
        this.unsubscribe();
    }

    public void checkAndSubscribe(String topic) {
        if (!this.subscribed) {
            this.consumer.subscribe(Collections.singletonList(topic));
            this.subscribed = true;
        }
    }

    public void checkAndSubscribe(String topic, final long remaining) {
        if (!this.subscribed) {
            this.consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
                    AbstractKafkaResumeStrategy.this.consumer.seekToEnd(assignments);
                    for (TopicPartition assignment : assignments) {
                        long endPosition = AbstractKafkaResumeStrategy.this.consumer.position(assignment);
                        long startPosition = endPosition - remaining;
                        if (startPosition >= 0L) {
                            AbstractKafkaResumeStrategy.this.consumer.seek(assignment, startPosition);
                            continue;
                        }
                        LOG.info("Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
                    }
                }
            });
            this.subscribed = true;
        }
    }

    public abstract void subscribe() throws Exception;

    public void unsubscribe() {
        try {
            this.consumer.unsubscribe();
        }
        catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", (Object)this.topic);
        }
        catch (Exception e) {
            LOG.error("Error unsubscribing from the Kafka topic {}: {}", new Object[]{this.topic, e.getMessage(), e});
        }
    }

    public ConsumerRecords<K, V> consume() {
        int retries = 10;
        return this.consume(retries);
    }

    public ConsumerRecords<K, V> consume(int retries) {
        while (retries > 0) {
            ConsumerRecords records = this.consumer.poll(this.pollDuration);
            if (!records.isEmpty()) {
                return records;
            }
            --retries;
        }
        return ConsumerRecords.empty();
    }

    public long getErrorCount() {
        return this.errorCount;
    }

    public List<Future<RecordMetadata>> getSentItems() {
        return Collections.unmodifiableList(this.sentItems);
    }

    public void build() {
        super.build();
    }

    public void init() {
        super.init();
        LOG.debug("Initializing the Kafka resume strategy");
        if (this.consumer == null) {
            this.consumer = new KafkaConsumer(this.consumerConfig);
        }
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.producerConfig);
        }
    }

    public void stop() {
    }

    public void close() throws IOException {
        super.close();
    }

    public void start() {
        LOG.info("Starting the kafka resume strategy");
        try {
            this.loadCache();
        }
        catch (Exception e) {
            LOG.error("Failed to load already processed items: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    public Duration getPollDuration() {
        return this.pollDuration;
    }

    public void setPollDuration(Duration pollDuration) {
        this.pollDuration = Objects.requireNonNull(pollDuration, "The poll duration cannot be null");
    }

    protected Consumer<K, V> getConsumer() {
        return this.consumer;
    }

    protected Producer<K, V> getProducer() {
        return this.producer;
    }
}

