/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes.util;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonObject;
import net.pincette.json.JsonUtil;
import net.pincette.util.Pair;
import net.pincette.util.StreamUtil;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;

public class Kafka {
    private static final String KAFKA_PREFIX = "KAFKA_";
    private static final Map<String, Object> RELIABLE_PRODUCER_CONFIG = Collections.unmodifiableMap(net.pincette.util.Collections.map(Pair.pair("acks", "all"), Pair.pair("enable.idempotence", true), Pair.pair("request.timeout.ms", 5000), Pair.pair("max.in.flight.requests.per.connection", 1)));

    private Kafka() {
    }

    public static KStream<String, Pair<JsonObject, JsonObject>> correlate(KStream<String, JsonObject> stream1, KStream<String, JsonObject> stream2, Duration window) {
        return Kafka.toCorr(stream1).join(Kafka.toCorr(stream2), Pair::pair, JoinWindows.of(window));
    }

    public static <K, V> KafkaProducer<K, V> createReliableProducer(Map<String, Object> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        return new KafkaProducer<K, V>(Kafka.producerConfig(net.pincette.util.Collections.merge(config, RELIABLE_PRODUCER_CONFIG)), keySerializer, valueSerializer);
    }

    public static Map<String, Object> fromConfig(Config config, String path) {
        return config.getConfig(path).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((ConfigValue)e.getValue()).unwrapped()));
    }

    public static Map<String, Object> fromEnv() {
        return Kafka.kafkaEnv().collect(Collectors.toMap(e -> Kafka.kafkaProperty((String)e.getKey()), Map.Entry::getValue));
    }

    private static CompletionStage<Map<String, Map<TopicPartition, Long>>> getConsumerGroupOffsets(Stream<ConsumerGroupListing> groups, Admin admin) {
        return StreamUtil.composeAsyncStream(groups.map(ConsumerGroupListing::groupId).map(id -> Pair.pair(id, admin.listConsumerGroupOffsets((String)id))).map(pair -> ((ListConsumerGroupOffsetsResult)pair.second).partitionsToOffsetAndMetadata().thenApply(offsets -> Pair.pair((String)pair.first, Kafka.toLong(offsets, OffsetAndMetadata::offset)))).map(Kafka::wrap)).thenApply(pairs -> pairs.collect(Collectors.toMap(p -> (String)p.first, p -> (Map)p.second)));
    }

    private static CompletionStage<Collection<TopicPartition>> getTopicPartitions(Admin admin) {
        return Kafka.wrap(admin.listTopics().listings()).thenApply(Kafka::nonInternal).thenComposeAsync(names -> Kafka.wrap(admin.describeTopics((Collection<String>)names).all().thenApply(topics -> Kafka.toPartitions(topics.values()))));
    }

    private static CompletionStage<Map<TopicPartition, Long>> getTopicPartitionOffsets(Admin admin) {
        return Kafka.getTopicPartitions(admin).thenApply(Kafka::latest).thenComposeAsync(latest -> Kafka.wrap(admin.listOffsets((Map<TopicPartition, OffsetSpec>)latest).all())).thenApply(offsets -> Kafka.toLong(offsets, ListOffsetsResult.ListOffsetsResultInfo::offset));
    }

    private static Stream<Map.Entry<String, String>> kafkaEnv() {
        return System.getenv().entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(KAFKA_PREFIX));
    }

    private static String kafkaProperty(String env) {
        return Arrays.stream(env.substring(KAFKA_PREFIX.length()).split("_")).map(String::toLowerCase).collect(Collectors.joining("."));
    }

    private static Map<TopicPartition, OffsetSpec> latest(Collection<TopicPartition> partitions) {
        return partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetSpec.LatestSpec()));
    }

    public static CompletionStage<Map<String, Map<TopicPartition, Long>>> messageLag(Admin admin) {
        return Kafka.messageLag(admin, (String g) -> true);
    }

    public static CompletionStage<Map<String, Map<TopicPartition, Long>>> messageLag(Admin admin, Predicate<String> includeGroup) {
        return Kafka.wrap(admin.listConsumerGroups().valid()).thenComposeAsync(groups -> Kafka.getConsumerGroupOffsets(groups.stream().filter(g -> includeGroup.test(g.groupId())), admin)).thenComposeAsync(groupOffsets -> Kafka.getTopicPartitionOffsets(admin).thenApply(partitionOffsets -> Kafka.messageLagPerGroup(groupOffsets, partitionOffsets)));
    }

    private static Map<TopicPartition, Long> messageLag(Map<TopicPartition, Long> offsets, Map<TopicPartition, Long> latest) {
        return offsets.entrySet().stream().map(e -> Optional.ofNullable((Long)latest.get(e.getKey())).map(v -> Pair.pair((TopicPartition)e.getKey(), v - (Long)e.getValue())).orElse(null)).filter(Objects::nonNull).collect(Collectors.toMap(pair -> (TopicPartition)pair.first, pair -> (Long)pair.second));
    }

    private static Map<String, Map<TopicPartition, Long>> messageLagPerGroup(Map<String, Map<TopicPartition, Long>> consumerGroupOffsets, Map<TopicPartition, Long> partitionOffsets) {
        return consumerGroupOffsets.entrySet().stream().map(e -> Pair.pair((String)e.getKey(), Kafka.messageLag((Map)e.getValue(), partitionOffsets))).collect(Collectors.toMap(pair -> (String)pair.first, pair -> (Map)pair.second));
    }

    private static Collection<String> nonInternal(Collection<TopicListing> topics) {
        return topics.stream().filter(t -> !t.isInternal()).map(TopicListing::name).collect(Collectors.toList());
    }

    private static Map<String, Object> producerConfig(Map<String, Object> config) {
        Set names = net.pincette.util.Collections.union(ProducerConfig.configNames(), net.pincette.util.Collections.set("sasl.jaas.config", "sasl.mechanism", "ssl.endpoint.identification.algorithm"));
        return config.entrySet().stream().filter(e -> names.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public static <K, V> CompletionStage<Boolean> send(KafkaProducer<K, V> producer, ProducerRecord<K, V> rec) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<Boolean>();
        producer.send(rec, (metadata, exception) -> {
            if (exception != null) {
                completableFuture.completeExceptionally(exception);
            } else {
                completableFuture.complete(true);
            }
        });
        return completableFuture;
    }

    private static KStream<String, JsonObject> toCorr(KStream<String, JsonObject> stream) {
        return stream.filter((k, v) -> v.containsKey("_corr")).map((k, v) -> new KeyValue<String, JsonObject>(v.getString("_corr").toLowerCase(), (JsonObject)v));
    }

    public static JsonObject toJson(Map<String, Map<TopicPartition, Long>> messageLags) {
        return messageLags.entrySet().stream().reduce(JsonUtil.createObjectBuilder(), (b, e) -> b.add((String)e.getKey(), Kafka.toJsonPerPartition((Map)e.getValue())), (b1, b2) -> b1).build();
    }

    private static JsonObject toJsonPerPartition(Map<TopicPartition, Long> messageLags) {
        return JsonUtil.from(messageLags.entrySet().stream().collect(Collectors.groupingBy(e -> ((TopicPartition)e.getKey()).topic(), Collectors.toMap(e -> String.valueOf(((TopicPartition)e.getKey()).partition()), Map.Entry::getValue))));
    }

    private static <T> Map<TopicPartition, Long> toLong(Map<TopicPartition, T> offsets, ToLongFunction<T> fn) {
        return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> fn.applyAsLong(e.getValue())));
    }

    private static Collection<TopicPartition> toPartitions(Collection<TopicDescription> topics) {
        return topics.stream().flatMap(t -> t.partitions().stream().map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toList());
    }

    public static <T> CompletionStage<T> wrap(KafkaFuture<T> future) {
        CompletableFuture result = new CompletableFuture();
        future.whenComplete((v, e) -> {
            if (e != null) {
                result.completeExceptionally((Throwable)e);
            } else {
                result.complete(v);
            }
        });
        return result;
    }
}

