/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class KafkaThrottledLatestProcessedCommit
extends ContextHolder
implements KafkaCommitHandler {
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap<String, Map<Integer, TopicPartition>>();
    private final Map<TopicPartition, OffsetStore> offsetStores = new HashMap<TopicPartition, OffsetStore>();
    private final KafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int unprocessedRecordMaxAge;
    private final int autoCommitInterval;
    private volatile long timerId = -1L;

    private KafkaThrottledLatestProcessedCommit(Vertx vertx, KafkaConsumer<?, ?> consumer, KafkaSource<?, ?> source, int unprocessedRecordMaxAge, int autoCommitInterval) {
        super(vertx);
        this.consumer = consumer;
        this.source = source;
        this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
        this.autoCommitInterval = autoCommitInterval;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(Vertx vertx, KafkaConsumer<?, ?> consumer, String groupId, KafkaConnectorIncomingConfiguration config, KafkaSource<?, ?> source) {
        int unprocessedRecordMaxAge = config.getThrottledUnprocessedRecordMaxAgeMs();
        int autoCommitInterval = config.config().getOptionalValue("auto.commit.interval.ms", Integer.class).orElse(5000);
        KafkaLogging.log.settingCommitInterval(groupId, autoCommitInterval);
        if (unprocessedRecordMaxAge <= 0) {
            KafkaLogging.log.disableThrottledCommitStrategyHealthCheck(groupId);
        } else {
            KafkaLogging.log.setThrottledCommitStrategyReceivedRecordMaxAge(groupId, unprocessedRecordMaxAge);
        }
        return new KafkaThrottledLatestProcessedCommit(vertx, consumer, source, unprocessedRecordMaxAge, autoCommitInterval);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> record) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(record.getTopic(), topic -> new ConcurrentHashMap()).computeIfAbsent(record.getPartition(), partition -> new TopicPartition(record.getTopic(), partition.intValue()));
    }

    private OffsetStore getOffsetStore(TopicPartition topicPartition) {
        return this.offsetStores.computeIfAbsent(topicPartition, k -> new OffsetStore((TopicPartition)k, this.unprocessedRecordMaxAge));
    }

    @Override
    public void partitionsAssigned(Set<TopicPartition> partitions) {
        this.offsetStores.clear();
        this.stopFlushAndCheckHealthTimer();
        if (!partitions.isEmpty()) {
            this.startFlushAndCheckHealthTimer();
        }
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1L) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        this.timerId = this.vertx.setTimer((long)this.autoCommitInterval, this::flushAndCheckHealth);
    }

    @Override
    public <K, V> IncomingKafkaRecord<K, V> received(IncomingKafkaRecord<K, V> record) {
        TopicPartition recordsTopicPartition = this.getTopicPartition(record);
        this.getOffsetStore(recordsTopicPartition).received(record.getOffset());
        if (this.timerId < 0L) {
            this.startFlushAndCheckHealthTimer();
        }
        return record;
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        HashMap<TopicPartition, Long> offsetsMapping = new HashMap<TopicPartition, Long>();
        this.offsetStores.forEach((topicPartition, value) -> value.clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset().ifPresent(offset -> offsetsMapping.put((TopicPartition)topicPartition, offset)));
        return offsetsMapping;
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.runOnContext(() -> {
            this.offsetStores.get(this.getTopicPartition(record)).processed(record.getOffset());
            future.complete(null);
        });
        return future;
    }

    private void flushAndCheckHealth(long timerId) {
        Map<TopicPartition, Long> offsetsMapping = this.clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
        if (!offsetsMapping.isEmpty()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = offsetsMapping.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata().setOffset((Long)e.getValue() + 1L)));
            this.consumer.getDelegate().commit(offsets, a -> this.startFlushAndCheckHealthTimer());
        } else {
            this.startFlushAndCheckHealthTimer();
        }
        if (this.unprocessedRecordMaxAge > 0) {
            this.offsetStores.values().stream().filter(OffsetStore::hasTooManyMessagesWithoutAck).forEach(o -> this.source.reportFailure(new TooManyMessagesWithoutAckException()));
        }
    }

    @Override
    public void terminate() {
        this.stopFlushAndCheckHealthTimer();
    }

    public static class TooManyMessagesWithoutAckException
    extends Exception {
        public TooManyMessagesWithoutAckException() {
            super("Too Many Messages without Ack");
        }
    }

    private static class OffsetStore {
        private final TopicPartition topicPartition;
        private final Queue<OffsetReceivedAt> receivedOffsets = new LinkedList<OffsetReceivedAt>();
        private final Set<Long> processedOffsets = new HashSet<Long>();
        private final int unprocessedRecordMaxAge;
        private long unProcessedTotal = 0L;

        OffsetStore(TopicPartition topicPartition, int unprocessedRecordMaxAge) {
            this.topicPartition = topicPartition;
            this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
        }

        void received(long offset) {
            this.receivedOffsets.offer(OffsetReceivedAt.received(offset));
            ++this.unProcessedTotal;
        }

        void processed(long offset) {
            if (!this.receivedOffsets.isEmpty() && this.receivedOffsets.peek().getOffset() <= offset) {
                this.processedOffsets.add(offset);
            }
        }

        OptionalLong clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            if (!this.processedOffsets.isEmpty()) {
                long largestSequentialProcessedOffset = -1L;
                while (!this.receivedOffsets.isEmpty() && this.processedOffsets.remove(this.receivedOffsets.peek().getOffset())) {
                    --this.unProcessedTotal;
                    largestSequentialProcessedOffset = this.receivedOffsets.poll().getOffset();
                }
                if (largestSequentialProcessedOffset > -1L) {
                    return OptionalLong.of(largestSequentialProcessedOffset);
                }
            }
            return OptionalLong.empty();
        }

        boolean hasTooManyMessagesWithoutAck() {
            if (this.receivedOffsets.isEmpty()) {
                return false;
            }
            if (System.currentTimeMillis() - this.receivedOffsets.peek().getReceivedAt() > (long)this.unprocessedRecordMaxAge) {
                KafkaLogging.log.receivedTooManyMessagesWithoutAcking(this.topicPartition.toString(), this.unProcessedTotal);
                return true;
            }
            return false;
        }
    }

    private static class OffsetReceivedAt {
        private final long offset;
        private final long receivedAt;

        private OffsetReceivedAt(long offset, long receivedAt) {
            this.offset = offset;
            this.receivedAt = receivedAt;
        }

        static OffsetReceivedAt received(long offset) {
            return new OffsetReceivedAt(offset, System.currentTimeMillis());
        }

        public long getOffset() {
            return this.offset;
        }

        public long getReceivedAt() {
            return this.receivedAt;
        }
    }
}

