/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.idempotent.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.processor.idempotent.kafka.KafkaConsumerUtil;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource(description="Kafka IdempotentRepository")
public class KafkaIdempotentRepository
extends ServiceSupport
implements IdempotentRepository,
CamelContextAware {
    private static final int DEFAULT_MAXIMUM_CACHE_SIZE = 1000;
    private static final int DEFAULT_POLL_DURATION_MS = 100;
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private String topic;
    private String bootstrapServers;
    private String groupId;
    private Properties producerConfig;
    private Properties consumerConfig;
    private int maxCacheSize = 1000;
    private int pollDurationMs = 100;
    private Map<String, Object> cache;
    private Consumer<String, String> consumer;
    private Producer<String, String> producer;
    private CamelContext camelContext;

    public KafkaIdempotentRepository() {
    }

    public KafkaIdempotentRepository(String topic, String bootstrapServers) {
        this(topic, bootstrapServers, 1000, 100);
    }

    @Deprecated
    public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) {
        this(topic, bootstrapServers, 1000, 100, groupId);
    }

    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs) {
        this.topic = topic;
        this.bootstrapServers = bootstrapServers;
        this.maxCacheSize = maxCacheSize;
        this.pollDurationMs = pollDurationMs;
    }

    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig) {
        this(topic, consumerConfig, producerConfig, 1000, 100);
    }

    @Deprecated
    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) {
        this(topic, consumerConfig, producerConfig, 1000, 100, groupId);
    }

    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs) {
        this.topic = topic;
        this.consumerConfig = consumerConfig;
        this.producerConfig = producerConfig;
        this.maxCacheSize = maxCacheSize;
        this.pollDurationMs = pollDurationMs;
    }

    @Deprecated
    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) {
        this.topic = topic;
        this.bootstrapServers = bootstrapServers;
        this.maxCacheSize = maxCacheSize;
        this.pollDurationMs = pollDurationMs;
        this.groupId = groupId;
    }

    @Deprecated
    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs, String groupId) {
        this.topic = topic;
        this.consumerConfig = consumerConfig;
        this.producerConfig = producerConfig;
        this.maxCacheSize = maxCacheSize;
        this.pollDurationMs = pollDurationMs;
        this.groupId = groupId;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public Properties getProducerConfig() {
        return this.producerConfig;
    }

    public void setProducerConfig(Properties producerConfig) {
        this.producerConfig = producerConfig;
    }

    public Properties getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(Properties consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    public int getMaxCacheSize() {
        return this.maxCacheSize;
    }

    public void setMaxCacheSize(int maxCacheSize) {
        this.maxCacheSize = maxCacheSize;
    }

    public int getPollDurationMs() {
        return this.pollDurationMs;
    }

    public void setPollDurationMs(int pollDurationMs) {
        this.pollDurationMs = pollDurationMs;
    }

    @Deprecated
    public String getGroupId() {
        return this.groupId;
    }

    @Deprecated
    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    protected void doStart() throws Exception {
        ObjectHelper.notNull((Object)this.camelContext, (String)"camelContext");
        StringHelper.notEmpty((String)this.topic, (String)"topic");
        this.cache = LRUCacheFactory.newLRUCache((int)this.maxCacheSize);
        if (this.consumerConfig == null) {
            this.consumerConfig = new Properties();
            StringHelper.notEmpty((String)this.bootstrapServers, (String)"bootstrapServers");
            this.consumerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        if (this.producerConfig == null) {
            this.producerConfig = new Properties();
            StringHelper.notEmpty((String)this.bootstrapServers, (String)"bootstrapServers");
            this.producerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        ObjectHelper.notNull((Object)this.consumerConfig, (String)"consumerConfig");
        ObjectHelper.notNull((Object)this.producerConfig, (String)"producerConfig");
        this.consumerConfig.put("enable.auto.commit", Boolean.FALSE.toString());
        this.consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        this.consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer(this.consumerConfig);
        this.producerConfig.put("key.serializer", StringSerializer.class.getName());
        this.producerConfig.put("value.serializer", StringSerializer.class.getName());
        this.producerConfig.putIfAbsent("acks", "1");
        this.producerConfig.putIfAbsent("batch.size", "0");
        this.producer = new KafkaProducer(this.producerConfig);
        this.populateCache();
    }

    private void populateCache() {
        this.log.debug("Getting partitions of topic {}", (Object)this.topic);
        List partitionInfos = this.consumer.partitionsFor(this.topic);
        Collection partitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toUnmodifiableList());
        this.log.debug("Assigning consumer to partitions {}", (Object)partitions);
        this.consumer.assign(partitions);
        this.log.debug("Seeking consumer to beginning of partitions {}", (Object)partitions);
        this.consumer.seekToBeginning(partitions);
        Map endOffsets = this.consumer.endOffsets(partitions);
        this.log.debug("Consuming records from partitions {} till end offsets {}", (Object)partitions, (Object)endOffsets);
        while (!KafkaConsumerUtil.isReachedOffsets(this.consumer, endOffsets)) {
            ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofMillis(this.pollDurationMs));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                this.addToCache((ConsumerRecord<String, String>)consumerRecord);
            }
        }
    }

    private void addToCache(ConsumerRecord<String, String> consumerRecord) {
        CacheAction action = null;
        try {
            action = CacheAction.valueOf((String)consumerRecord.value());
        }
        catch (IllegalArgumentException iax) {
            this.log.error("Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.", new Object[]{consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()});
        }
        String messageId = (String)consumerRecord.key();
        if (action == CacheAction.add) {
            this.log.debug("Adding to cache messageId:{}", (Object)messageId);
            this.cache.put(messageId, messageId);
        } else if (action == CacheAction.remove) {
            this.log.debug("Removing from cache messageId:{}", (Object)messageId);
            this.cache.remove(messageId);
        } else if (action == CacheAction.clear) {
            this.cache.clear();
        } else {
            throw new RuntimeException("Illegal action " + String.valueOf((Object)action) + " for key " + (String)consumerRecord.key());
        }
    }

    protected void doStop() {
        IOHelper.close(this.consumer, (String)"consumer", (Logger)this.log);
        IOHelper.close(this.producer, (String)"producer", (Logger)this.log);
    }

    public boolean add(String key) {
        if (this.cache.containsKey(key)) {
            return false;
        }
        this.cache.put(key, key);
        this.broadcastAction(key, CacheAction.add);
        return true;
    }

    private void broadcastAction(String key, CacheAction action) {
        try {
            this.log.debug("Broadcasting action:{} for key:{}", (Object)action, (Object)key);
            ObjectHelper.notNull(this.producer, (String)"producer");
            this.producer.send(new ProducerRecord(this.topic, (Object)key, (Object)action.toString())).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeCamelException((Throwable)e);
        }
        catch (ExecutionException e) {
            throw new RuntimeCamelException((Throwable)e);
        }
    }

    @ManagedOperation(description="Does the store contain the given key")
    public boolean contains(String key) {
        this.log.debug("Checking cache for key:{}", (Object)key);
        return this.cache.containsKey(key);
    }

    @ManagedOperation(description="Remove the key from the store")
    public boolean remove(String key) {
        this.cache.remove(key, key);
        this.broadcastAction(key, CacheAction.remove);
        return true;
    }

    public boolean confirm(String key) {
        return true;
    }

    public void clear() {
        this.broadcastAction(null, CacheAction.clear);
    }

    static enum CacheAction {
        add,
        remove,
        clear;

    }
}

