/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.mutiny.core.Vertx;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class KafkaThrottledLatestProcessedCommit
extends ContextHolder
implements KafkaCommitHandler {
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap<String, Map<Integer, TopicPartition>>();
    private final Map<TopicPartition, OffsetStore> offsetStores = new HashMap<TopicPartition, OffsetStore>();
    private final String groupId;
    private final ReactiveKafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int unprocessedRecordMaxAge;
    private final int autoCommitInterval;
    private volatile long timerId = -1L;

    private KafkaThrottledLatestProcessedCommit(String groupId, Vertx vertx, ReactiveKafkaConsumer<?, ?> consumer, KafkaSource<?, ?> source, int unprocessedRecordMaxAge, int autoCommitInterval, int defaultTimeout) {
        super(vertx.getDelegate(), defaultTimeout);
        this.groupId = groupId;
        this.consumer = consumer;
        this.source = source;
        this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
        this.autoCommitInterval = autoCommitInterval;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(Vertx vertx, ReactiveKafkaConsumer<?, ?> consumer, String groupId, KafkaConnectorIncomingConfiguration config, KafkaSource<?, ?> source) {
        int unprocessedRecordMaxAge = config.getThrottledUnprocessedRecordMaxAgeMs();
        int autoCommitInterval = config.config().getOptionalValue("auto.commit.interval.ms", Integer.class).orElse(5000);
        int defaultTimeout = config.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000);
        KafkaLogging.log.settingCommitInterval(groupId, autoCommitInterval);
        if (unprocessedRecordMaxAge <= 0) {
            KafkaLogging.log.disableThrottledCommitStrategyHealthCheck(groupId);
        } else {
            KafkaLogging.log.setThrottledCommitStrategyReceivedRecordMaxAge(groupId, unprocessedRecordMaxAge);
        }
        return new KafkaThrottledLatestProcessedCommit(groupId, vertx, consumer, source, unprocessedRecordMaxAge, autoCommitInterval, defaultTimeout);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> record) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(record.getTopic(), topic -> new ConcurrentHashMap()).computeIfAbsent(record.getPartition(), partition -> new TopicPartition(record.getTopic(), partition.intValue()));
    }

    @Override
    public void partitionsAssigned(Collection<TopicPartition> partitions) {
        this.runOnContextAndAwait(() -> {
            this.stopFlushAndCheckHealthTimer();
            if (!partitions.isEmpty() || !this.offsetStores.isEmpty()) {
                this.startFlushAndCheckHealthTimer();
            }
            return null;
        });
    }

    @Override
    public void partitionsRevoked(Collection<TopicPartition> partitions) {
        this.runOnContextAndAwait(() -> {
            this.stopFlushAndCheckHealthTimer();
            return null;
        });
        Callable<Tuple2> task = () -> {
            HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (TopicPartition partition : new HashSet<TopicPartition>(this.offsetStores.keySet())) {
                OffsetStore store;
                long largestOffset;
                if (!partitions.contains(partition) || (largestOffset = (store = this.offsetStores.remove(partition)).clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset()) <= -1L) continue;
                toCommit.put(partition, new OffsetAndMetadata(largestOffset + 1L, null));
            }
            return Tuple2.of(toCommit, (Object)(!this.offsetStores.isEmpty() ? 1 : 0));
        };
        Tuple2 result = this.runOnContextAndAwait(task);
        if (!((Map)result.getItem1()).isEmpty()) {
            this.consumer.unwrap().commitSync((Map)result.getItem1());
        }
        if (((Boolean)result.getItem2()).booleanValue()) {
            this.runOnContext(this::startFlushAndCheckHealthTimer);
        }
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1L) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        this.timerId = this.vertx.setTimer((long)this.autoCommitInterval, x -> this.runOnContext(() -> this.flushAndCheckHealth((long)x)));
    }

    @Override
    public <K, V> IncomingKafkaRecord<K, V> received(IncomingKafkaRecord<K, V> record) {
        TopicPartition recordsTopicPartition = this.getTopicPartition(record);
        this.offsetStores.computeIfAbsent(recordsTopicPartition, k -> new OffsetStore((TopicPartition)k, this.unprocessedRecordMaxAge)).received(record.getOffset());
        if (this.timerId < 0L) {
            this.startFlushAndCheckHealthTimer();
        }
        return record;
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        HashMap<TopicPartition, Long> offsetsMapping = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, OffsetStore> entry : this.offsetStores.entrySet()) {
            long offset = entry.getValue().clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset();
            if (offset <= -1L) continue;
            offsetsMapping.put(entry.getKey(), offset);
        }
        return offsetsMapping;
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.runOnContext(() -> {
            TopicPartition topicPartition = this.getTopicPartition(record);
            OffsetStore store = this.offsetStores.get(topicPartition);
            if (store != null) {
                store.processed(record.getOffset());
            } else {
                KafkaLogging.log.messageAckedForRevokedTopicPartition(record.getOffset(), this.groupId, topicPartition.toString());
            }
            future.complete(null);
        });
        return future;
    }

    private void flushAndCheckHealth(long timerId) {
        Map<TopicPartition, Long> offsetsMapping = this.clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
        if (!offsetsMapping.isEmpty()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = this.getOffsets(offsetsMapping);
            this.consumer.commit(offsets).subscribe().with(a -> this.startFlushAndCheckHealthTimer(), f -> this.startFlushAndCheckHealthTimer());
        } else {
            this.startFlushAndCheckHealthTimer();
        }
        if (this.unprocessedRecordMaxAge > 0) {
            for (OffsetStore store : this.offsetStores.values()) {
                if (!store.hasTooManyMessagesWithoutAck()) continue;
                long lastOffset = store.getLastCommittedOffset();
                String topic = store.topicPartition.topic();
                int partition = store.topicPartition.partition();
                TooManyMessagesWithoutAckException exception = new TooManyMessagesWithoutAckException(topic, partition, lastOffset);
                this.source.reportFailure((Throwable)((Object)exception), true);
            }
        }
    }

    @Override
    public void terminate(boolean graceful) {
        long stillUnprocessed;
        if (graceful && (stillUnprocessed = this.waitForProcessing()) > 0L) {
            KafkaLogging.log.messageStillUnprocessedAfterTimeout(stillUnprocessed);
        }
        this.commitAllAndAwait();
        this.runOnContextAndAwait(() -> {
            this.offsetStores.clear();
            this.stopFlushAndCheckHealthTimer();
            return null;
        });
    }

    private long waitForProcessing() {
        int attempt = this.autoCommitInterval / 100;
        for (int i = 0; i < attempt; ++i) {
            long sum = this.offsetStores.values().stream().map(OffsetStore::getUnprocessedCount).mapToLong(l -> l).sum();
            if (sum == 0L) {
                return sum;
            }
            KafkaLogging.log.waitingForMessageProcessing(sum);
            try {
                Thread.sleep(100L);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        return this.offsetStores.values().stream().map(OffsetStore::getUnprocessedCount).mapToLong(l -> l).sum();
    }

    private void commitAllAndAwait() {
        Map offsetsMapping = this.runOnContextAndAwait(this::clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping);
        this.commitAndAwait(offsetsMapping);
    }

    private void commitAndAwait(Map<TopicPartition, Long> offsetsMapping) {
        if (!offsetsMapping.isEmpty()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = this.getOffsets(offsetsMapping);
            CompletableFuture stage = this.consumer.commit(offsets).subscribeAsCompletionStage();
            try {
                stage.get(this.autoCommitInterval, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsets(Map<TopicPartition, Long> offsetsMapping) {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, Long> entry : offsetsMapping.entrySet()) {
            map.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L, null));
        }
        return map;
    }

    public static class TooManyMessagesWithoutAckException
    extends NoStackTraceThrowable {
        public TooManyMessagesWithoutAckException(String topic, int partition, long lastOffset) {
            super("Too Many Messages without acknowledgement in topic " + topic + " (partition:" + partition + "), last committed offset is " + lastOffset);
        }
    }

    private class OffsetStore {
        private final TopicPartition topicPartition;
        private final Queue<OffsetReceivedAt> receivedOffsets = new LinkedList<OffsetReceivedAt>();
        private final Set<Long> processedOffsets = new HashSet<Long>();
        private final int unprocessedRecordMaxAge;
        private final AtomicLong unProcessedTotal = new AtomicLong();
        private long lastCommitted = -1L;

        OffsetStore(TopicPartition topicPartition, int unprocessedRecordMaxAge) {
            this.topicPartition = topicPartition;
            this.unprocessedRecordMaxAge = unprocessedRecordMaxAge;
        }

        long getLastCommittedOffset() {
            return this.lastCommitted;
        }

        void received(long offset) {
            this.receivedOffsets.offer(OffsetReceivedAt.received(offset));
            this.unProcessedTotal.incrementAndGet();
        }

        void processed(long offset) {
            OffsetReceivedAt received = this.receivedOffsets.peek();
            if (received != null && received.getOffset() <= offset) {
                this.processedOffsets.add(offset);
            }
        }

        long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            if (!this.processedOffsets.isEmpty()) {
                long largestSequentialProcessedOffset = -1L;
                while (!this.receivedOffsets.isEmpty() && this.processedOffsets.remove(this.receivedOffsets.peek().getOffset())) {
                    this.unProcessedTotal.decrementAndGet();
                    OffsetReceivedAt poll = this.receivedOffsets.poll();
                    if (poll == null) continue;
                    largestSequentialProcessedOffset = poll.getOffset();
                }
                if (largestSequentialProcessedOffset > -1L) {
                    this.lastCommitted = largestSequentialProcessedOffset;
                    return largestSequentialProcessedOffset;
                }
            }
            return -1L;
        }

        boolean hasTooManyMessagesWithoutAck() {
            long time;
            if (this.receivedOffsets.isEmpty()) {
                return false;
            }
            OffsetReceivedAt peek = this.receivedOffsets.peek();
            long l = time = peek == null ? 0L : System.currentTimeMillis() - peek.getReceivedAt();
            if (time > (long)this.unprocessedRecordMaxAge) {
                KafkaLogging.log.receivedTooManyMessagesWithoutAcking(this.topicPartition.toString(), this.unProcessedTotal.get(), this.lastCommitted);
                return true;
            }
            return false;
        }

        long getUnprocessedCount() {
            return this.unProcessedTotal.get();
        }
    }

    private static class OffsetReceivedAt {
        private final long offset;
        private final long receivedAt;

        private OffsetReceivedAt(long offset, long receivedAt) {
            this.offset = offset;
            this.receivedAt = receivedAt;
        }

        static OffsetReceivedAt received(long offset) {
            return new OffsetReceivedAt(offset, System.currentTimeMillis());
        }

        public long getOffset() {
            return this.offset;
        }

        public long getReceivedAt() {
            return this.receivedAt;
        }
    }
}

