/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.kafka.endpoint;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.citrusframework.context.TestContext;
import org.citrusframework.endpoint.EndpointConfiguration;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.kafka.endpoint.KafkaEndpointConfiguration;
import org.citrusframework.kafka.endpoint.KafkaMessageConsumerUtils;
import org.citrusframework.kafka.endpoint.KafkaMessageFilter;
import org.citrusframework.kafka.endpoint.KafkaMessageSingleConsumer;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageSelector;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractSelectiveMessageConsumer;
import org.citrusframework.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaMessageFilteringConsumer
extends AbstractSelectiveMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageFilteringConsumer.class);
    private final KafkaConsumer<Object, Object> consumer;
    private KafkaMessageFilter kafkaMessageFilter;

    public static KafkaMessageFilteringConsumerBuilder builder() {
        return new KafkaMessageFilteringConsumerBuilder();
    }

    private KafkaMessageFilteringConsumer(KafkaEndpointConfiguration endpointConfiguration, KafkaConsumer<Object, Object> consumer, @Nullable Duration eventLookbackWindow, @Nullable Duration pollTimeout, @Nullable KafkaMessageSelector kafkaMessageSelector) {
        super(KafkaMessageSingleConsumer.class.getSimpleName(), (EndpointConfiguration)endpointConfiguration);
        this.consumer = consumer;
        if (Objects.nonNull(eventLookbackWindow) || Objects.nonNull(kafkaMessageSelector) || Objects.nonNull(pollTimeout)) {
            this.kafkaMessageFilter = KafkaMessageFilter.kafkaMessageFilter().eventLookbackWindow(eventLookbackWindow).kafkaMessageSelector(kafkaMessageSelector).pollTimeout(pollTimeout).buildFilter();
        }
    }

    public KafkaConsumer<Object, Object> getConsumer() {
        return this.consumer;
    }

    public KafkaMessageFilter getKafkaMessageFilter() {
        return this.kafkaMessageFilter;
    }

    public Message receive(String selector, TestContext testContext, long timeout) {
        List<ConsumerRecord<Object, Object>> consumerRecords;
        if (StringUtils.isEmpty((String)selector) && Objects.isNull(this.kafkaMessageFilter)) {
            throw new CitrusRuntimeException("Cannot invoke filtering kafka message consumer without selectors");
        }
        if (StringUtils.hasText((String)selector)) {
            this.kafkaMessageFilter = KafkaMessageFilter.kafkaMessageFilter(selector, this.getEndpointConfiguration().getKafkaMessageSelectorFactory());
        }
        this.kafkaMessageFilter.sanitize();
        String topic = KafkaMessageConsumerUtils.resolveTopic(this.getEndpointConfiguration(), testContext);
        logger.debug("Receiving Kafka message on topic '{}' using selector: {}", (Object)topic, (Object)this.kafkaMessageFilter);
        if (!this.consumer.subscription().isEmpty()) {
            logger.warn("Cancelling active subscriptions of consumer before looking for Kafka events, because subscription to topics, partitions and pattern are mutually exclusive");
            this.consumer.unsubscribe();
        }
        Duration messageTimeout = Duration.ofMillis(timeout);
        if (this.kafkaMessageFilter.getPollTimeout().compareTo(messageTimeout) > 0) {
            logger.warn("Truncating poll timeout to maximum message timeout ({} ms) - having one single poll exceeding the total timeout would prevent proper timeout handling", (Object)messageTimeout.toMillis());
            this.kafkaMessageFilter.setPollTimeout(messageTimeout);
        }
        if ((consumerRecords = this.findMatchingMessageInTopicWithTimeout(topic, timeout)).isEmpty()) {
            throw new CitrusRuntimeException("Failed to resolve Kafka message using selector: " + selector);
        }
        Message received = KafkaMessageConsumerUtils.parseConsumerRecordsToMessage(consumerRecords, this.getEndpointConfiguration(), testContext);
        if (logger.isDebugEnabled()) {
            logger.info("Received Kafka message on topic '{}': {}", (Object)topic, (Object)received);
        } else {
            logger.info("Received Kafka message on topic '{}'", (Object)topic);
        }
        return received;
    }

    private List<ConsumerRecord<Object, Object>> findMatchingMessageInTopicWithTimeout(String topic, long timeout) {
        logger.trace("Applied timeout is {} ms", (Object)timeout);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<List> handler = executorService.submit(() -> this.findMessagesSatisfyingMatcher(topic));
        try {
            List list = handler.get(timeout, TimeUnit.MILLISECONDS);
            return list;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CitrusRuntimeException("Thread was interrupted while waiting for Kafka message", (Throwable)e);
        }
        catch (ExecutionException e) {
            throw new CitrusRuntimeException(String.format("Failed to receive message on Kafka topic '%s'", topic), (Throwable)e);
        }
        catch (TimeoutException e) {
            logger.error("Failed to receive message on  Kafka topic '{}': {}", (Object)topic, (Object)ExceptionUtils.getRootCause((Throwable)e).getMessage());
            handler.cancel(true);
            throw new MessageTimeoutException(timeout, topic, (Throwable)e);
        }
        finally {
            this.shutdownExecutorAwaitingCurrentPoll(executorService);
        }
    }

    private void shutdownExecutorAwaitingCurrentPoll(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(this.kafkaMessageFilter.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(this.kafkaMessageFilter.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                    logger.error("Executor did not terminate, check for memory leaks!");
                }
            }
            logger.debug("Executor successfully shut down, unsubscribing consumer");
            this.consumer.unsubscribe();
        }
        catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    protected KafkaEndpointConfiguration getEndpointConfiguration() {
        return (KafkaEndpointConfiguration)super.getEndpointConfiguration();
    }

    private List<ConsumerRecord<Object, Object>> findMessagesSatisfyingMatcher(String topic) {
        ConsumerRecords consumerRecords;
        List<TopicPartition> partitions = this.consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).toList();
        this.consumer.assign(partitions);
        this.offsetConsumerOnTopicByLookbackWindow(partitions);
        Instant endTime = Instant.now();
        Instant startTime = endTime.minus(this.kafkaMessageFilter.getEventLookbackWindow());
        ArrayList<ConsumerRecord<Object, Object>> matchingConsumerRecords = new ArrayList<ConsumerRecord<Object, Object>>();
        while (!(consumerRecords = this.consumer.poll(this.kafkaMessageFilter.getPollTimeout())).isEmpty()) {
            for (ConsumerRecord consumerRecord : consumerRecords) {
                if (KafkaMessageFilteringConsumer.isConsumerRecordNewerThanEndTime((ConsumerRecord<Object, Object>)consumerRecord, endTime)) {
                    return matchingConsumerRecords;
                }
                if (KafkaMessageFilteringConsumer.isConsumerRecordOlderThanStartTime((ConsumerRecord<Object, Object>)consumerRecord, startTime) || !this.kafkaMessageFilter.getKafkaMessageSelector().matches((ConsumerRecord<Object, Object>)consumerRecord)) continue;
                matchingConsumerRecords.add((ConsumerRecord<Object, Object>)consumerRecord);
            }
        }
        return matchingConsumerRecords;
    }

    private void offsetConsumerOnTopicByLookbackWindow(List<TopicPartition> partitions) {
        Map partitionsWithTimestamps = partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> Instant.now().minusMillis(this.kafkaMessageFilter.getEventLookbackWindow().toMillis()).toEpochMilli()));
        Map newOffsets = this.consumer.offsetsForTimes(partitionsWithTimestamps);
        logger.trace("Applying new offsets: {}", (Object)newOffsets);
        newOffsets.forEach((partition, partitionOffset) -> {
            if (Objects.nonNull(partitionOffset)) {
                this.consumer.seek(partition, partitionOffset.offset());
            } else {
                this.consumer.seekToEnd(Collections.singletonList(partition));
            }
        });
    }

    private static boolean isConsumerRecordNewerThanEndTime(ConsumerRecord<Object, Object> consumerRecord, Instant endTime) {
        return consumerRecord.timestamp() > endTime.toEpochMilli();
    }

    private static boolean isConsumerRecordOlderThanStartTime(ConsumerRecord<Object, Object> consumerRecord, Instant startTime) {
        return consumerRecord.timestamp() < startTime.toEpochMilli();
    }

    public static class KafkaMessageFilteringConsumerBuilder {
        private KafkaEndpointConfiguration endpointConfiguration;
        private KafkaConsumer<Object, Object> consumer;
        private Duration eventLookbackWindow;
        private Duration pollTimeout;
        private KafkaMessageSelector kafkaMessageSelector;

        public KafkaMessageFilteringConsumerBuilder endpointConfiguration(KafkaEndpointConfiguration endpointConfiguration) {
            this.endpointConfiguration = endpointConfiguration;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder consumer(KafkaConsumer<Object, Object> consumer) {
            this.consumer = consumer;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder eventLookbackWindow(Duration eventLookbackWindow) {
            this.eventLookbackWindow = eventLookbackWindow;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder pollTimeout(Duration pollTimeout) {
            this.pollTimeout = pollTimeout;
            return this;
        }

        public KafkaMessageFilteringConsumerBuilder kafkaMessageSelector(KafkaMessageSelector kafkaMessageSelector) {
            this.kafkaMessageSelector = kafkaMessageSelector;
            return this;
        }

        public KafkaMessageFilteringConsumer build() {
            return new KafkaMessageFilteringConsumer(this.endpointConfiguration, this.consumer, this.eventLookbackWindow, this.pollTimeout, this.kafkaMessageSelector);
        }
    }
}

