/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.scheduler.metric;

import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.icthh.xm.commons.config.client.repository.TenantListRepository;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
@ConditionalOnProperty(value={"application.scheduler-enabled"})
public class KafkaOffsetsMetric
implements MetricSet {
    private static final Logger log = LoggerFactory.getLogger(KafkaOffsetsMetric.class);
    private static final String QUEUE = "_queue";
    private static final String DELIMITER = "_";
    private static final String METRIC_NAME = "kafka.offsets.";
    private static final String TOPIC_PREFIX = "scheduler_";
    @Value(value="${application.scheduler-config.kafka-offsets-metric-timeout:5}")
    private int timeout;
    @Value(value="${spring.application.name}")
    private String appName;
    private final TenantListRepository tenantListRepository;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;
    private Consumer<?, ?> consumer;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Offsets calculateConsumerOffsetsOnTopic(String topic, String group) {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<Offsets> future = exec.submit(() -> {
            long totalCurrentOffset = 0L;
            long totalEndOffset = 0L;
            try {
                Consumer<?, ?> consumer;
                if (this.consumer == null) {
                    consumer = this;
                    synchronized (consumer) {
                        if (this.consumer == null) {
                            this.consumer = this.createConsumerFactory(group).createConsumer();
                        }
                    }
                }
                consumer = this.consumer;
                synchronized (consumer) {
                    List partitionInfos = this.consumer.partitionsFor(topic);
                    LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                    Map endOffsets = this.consumer.endOffsets(topicPartitions);
                    for (Map.Entry endOffset : endOffsets.entrySet()) {
                        OffsetAndMetadata current = this.consumer.committed((TopicPartition)endOffset.getKey());
                        if (current != null) {
                            totalEndOffset += ((Long)endOffset.getValue()).longValue();
                            totalCurrentOffset += current.offset();
                            continue;
                        }
                        totalEndOffset += ((Long)endOffset.getValue()).longValue();
                    }
                }
            }
            catch (Exception e) {
                log.debug("Cannot generate metric for topic: " + topic, (Throwable)e);
            }
            return (KafkaOffsetsMetric)this.new Offsets(totalEndOffset - totalCurrentOffset, totalCurrentOffset, totalEndOffset);
        });
        try {
            Offsets offsets = future.get(this.timeout, TimeUnit.SECONDS);
            return offsets;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            Offsets offsets = new Offsets(Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE);
            return offsets;
        }
        catch (ExecutionException | TimeoutException e) {
            Offsets offsets = new Offsets(Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE);
            return offsets;
        }
        finally {
            exec.shutdownNow();
        }
    }

    private ConsumerFactory<?, ?> createConsumerFactory(String group) {
        if (this.defaultConsumerFactory == null) {
            HashMap<String, Object> props = new HashMap<String, Object>();
            props.put("key.deserializer", ByteArrayDeserializer.class);
            props.put("value.deserializer", ByteArrayDeserializer.class);
            if (!ObjectUtils.isEmpty((Object)this.binderConfigurationProperties.getConsumerProperties())) {
                props.putAll(this.binderConfigurationProperties.getConsumerProperties());
            }
            if (!props.containsKey("bootstrap.servers")) {
                props.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
            }
            props.put("group.id", group);
            this.defaultConsumerFactory = new DefaultKafkaConsumerFactory(props);
        }
        return this.defaultConsumerFactory;
    }

    public Map<String, Metric> getMetrics() {
        HashMap<String, Metric> metrics = new HashMap<String, Metric>();
        Set tenants = this.tenantListRepository.getTenants();
        tenants.forEach(tenantName -> {
            String topic = TOPIC_PREFIX + tenantName.toLowerCase() + DELIMITER + this.appName + QUEUE;
            metrics.put(METRIC_NAME + topic, () -> this.calculateConsumerOffsetsOnTopic(topic, this.appName));
        });
        return metrics;
    }

    public KafkaOffsetsMetric(TenantListRepository tenantListRepository, KafkaBinderConfigurationProperties binderConfigurationProperties) {
        this.tenantListRepository = tenantListRepository;
        this.binderConfigurationProperties = binderConfigurationProperties;
    }

    private class Offsets {
        private final long totalLag;
        private final long totalCurrentOffset;
        private final long totalEndOffset;

        public long getTotalLag() {
            return this.totalLag;
        }

        public long getTotalCurrentOffset() {
            return this.totalCurrentOffset;
        }

        public long getTotalEndOffset() {
            return this.totalEndOffset;
        }

        public Offsets(long totalLag, long totalCurrentOffset, long totalEndOffset) {
            this.totalLag = totalLag;
            this.totalCurrentOffset = totalCurrentOffset;
            this.totalEndOffset = totalEndOffset;
        }
    }
}

