/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerSeekUtil;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriber;

class ConsumerPositionsUtil {
    private ConsumerPositionsUtil() {
    }

    static Map<TopicPartition, Long> getPositionsBasedOnTime(@Nonnull Consumer<?, ?> consumer, @Nonnull TopicSubscriber subscriber, @Nonnull Instant rawDefaultAt) {
        List<TopicPartition> all = ConsumerSeekUtil.topicPartitions(consumer, subscriber);
        HashMap<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>();
        OffsetSupplier offsetSupplier = new OffsetSupplier(consumer, rawDefaultAt, all);
        all.forEach(assignedPartition -> {
            Long offset = offsetSupplier.getOffset(assignedPartition);
            if (offset > 1L) {
                positions.put((TopicPartition)assignedPartition, offset - 1L);
            }
        });
        return positions;
    }

    static Map<TopicPartition, Long> getHeadPositions(@Nonnull Consumer<?, ?> consumer, @Nonnull TopicSubscriber subscriber) {
        List<TopicPartition> all = ConsumerSeekUtil.topicPartitions(consumer, subscriber);
        HashMap<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>();
        Map endOffsets = consumer.endOffsets(all);
        endOffsets.forEach((assignedPartition, offset) -> {
            if (offset > 1L) {
                positions.put((TopicPartition)assignedPartition, offset - 1L);
            }
        });
        return positions;
    }

    private static class OffsetSupplier {
        private final Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap;
        private final Map<TopicPartition, Long> endOffsets;

        private OffsetSupplier(Consumer<?, ?> consumer, Instant rawDefaultAt, List<TopicPartition> all) {
            long defaultAt = rawDefaultAt.toEpochMilli();
            HashMap timestampsToSearch = new HashMap();
            all.forEach(tp -> timestampsToSearch.put(tp, defaultAt));
            this.partitionOffsetMap = consumer.offsetsForTimes(timestampsToSearch);
            this.endOffsets = consumer.endOffsets(all);
        }

        private Optional<Long> getDefaultOffset(TopicPartition assignedPartition) {
            return Optional.ofNullable(this.partitionOffsetMap.get(assignedPartition)).map(OffsetAndTimestamp::offset);
        }

        private long getEndOffset(TopicPartition assignedPartition) {
            return this.endOffsets.get(assignedPartition);
        }

        private Long getOffset(TopicPartition assignedPartition) {
            return this.getDefaultOffset(assignedPartition).orElseGet(() -> this.getEndOffset(assignedPartition));
        }
    }
}

