/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.kafka.basic.consumer;

import io.eventuate.messaging.kafka.basic.consumer.BackPressureActions;
import io.eventuate.messaging.kafka.basic.consumer.BackPressureConfig;
import io.eventuate.messaging.kafka.basic.consumer.BackPressureManager;
import io.eventuate.messaging.kafka.basic.consumer.ConsumerCallbacks;
import io.eventuate.messaging.kafka.basic.consumer.ConsumerPropertiesFactory;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerMessageHandler;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerState;
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.basic.consumer.KafkaMessageConsumer;
import io.eventuate.messaging.kafka.basic.consumer.KafkaMessageProcessor;
import io.eventuate.messaging.kafka.basic.consumer.KafkaMessageProcessorFailedException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventuateKafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger(EventuateKafkaConsumer.class);
    private final String subscriberId;
    private final EventuateKafkaConsumerMessageHandler handler;
    private final List<String> topics;
    private final KafkaConsumerFactory kafkaConsumerFactory;
    private final BackPressureConfig backPressureConfig;
    private final long pollTimeout;
    private AtomicBoolean stopFlag = new AtomicBoolean(false);
    private Properties consumerProperties;
    private volatile EventuateKafkaConsumerState state = EventuateKafkaConsumerState.CREATED;
    private volatile boolean closeConsumerOnStop = true;
    private Optional<ConsumerCallbacks> consumerCallbacks = Optional.empty();

    public EventuateKafkaConsumer(String subscriberId, EventuateKafkaConsumerMessageHandler handler, List<String> topics, String bootstrapServers, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties, KafkaConsumerFactory kafkaConsumerFactory) {
        this.subscriberId = subscriberId;
        this.handler = handler;
        this.topics = topics;
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.consumerProperties = ConsumerPropertiesFactory.makeDefaultConsumerProperties(bootstrapServers, subscriberId);
        this.consumerProperties.putAll(eventuateKafkaConsumerConfigurationProperties.getProperties());
        this.backPressureConfig = eventuateKafkaConsumerConfigurationProperties.getBackPressure();
        this.pollTimeout = eventuateKafkaConsumerConfigurationProperties.getPollTimeout();
    }

    public void setConsumerCallbacks(Optional<ConsumerCallbacks> consumerCallbacks) {
        this.consumerCallbacks = consumerCallbacks;
    }

    public boolean isCloseConsumerOnStop() {
        return this.closeConsumerOnStop;
    }

    public void setCloseConsumerOnStop(boolean closeConsumerOnStop) {
        this.closeConsumerOnStop = closeConsumerOnStop;
    }

    public static List<PartitionInfo> verifyTopicExistsBeforeSubscribing(KafkaMessageConsumer consumer, String topic) {
        try {
            logger.debug("Verifying Topic {}", (Object)topic);
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            logger.debug("Got these partitions {} for Topic {}", partitions, (Object)topic);
            return partitions;
        }
        catch (Throwable e) {
            logger.error("Got exception: ", e);
            throw new RuntimeException(e);
        }
    }

    private void maybeCommitOffsets(KafkaMessageConsumer consumer, KafkaMessageProcessor processor) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = processor.offsetsToCommit();
        if (!offsetsToCommit.isEmpty()) {
            this.consumerCallbacks.ifPresent(ConsumerCallbacks::onTryCommitCallback);
            logger.debug("Committing offsets {} {}", (Object)this.subscriberId, offsetsToCommit);
            consumer.commitOffsets(offsetsToCommit);
            logger.debug("Committed offsets {}", (Object)this.subscriberId);
            processor.noteOffsetsCommitted(offsetsToCommit);
            this.consumerCallbacks.ifPresent(ConsumerCallbacks::onCommitedCallback);
        }
    }

    public void start() {
        try {
            KafkaMessageConsumer consumer = this.kafkaConsumerFactory.makeConsumer(this.subscriberId, this.consumerProperties);
            KafkaMessageProcessor processor = new KafkaMessageProcessor(this.subscriberId, this.handler);
            BackPressureManager backpressureManager = new BackPressureManager(this.backPressureConfig);
            for (String topic : this.topics) {
                EventuateKafkaConsumer.verifyTopicExistsBeforeSubscribing(consumer, topic);
            }
            this.subscribe(consumer);
            new Thread(() -> {
                try {
                    this.runPollingLoop(consumer, processor, backpressureManager);
                    this.maybeCommitOffsets(consumer, processor);
                    this.state = EventuateKafkaConsumerState.STOPPED;
                    if (this.closeConsumerOnStop) {
                        consumer.close();
                    }
                }
                catch (KafkaMessageProcessorFailedException e) {
                    logger.trace("Terminating since KafkaMessageProcessorFailedException");
                    this.state = EventuateKafkaConsumerState.MESSAGE_HANDLING_FAILED;
                    consumer.close(Duration.of(200L, ChronoUnit.MILLIS));
                }
                catch (Throwable e) {
                    logger.error("Got exception: ", e);
                    this.state = EventuateKafkaConsumerState.FAILED;
                    consumer.close(Duration.of(200L, ChronoUnit.MILLIS));
                    throw new RuntimeException(e);
                }
                logger.trace("Stopped in state {}", (Object)this.state);
            }, "Eventuate-subscriber-" + this.subscriberId).start();
            this.state = EventuateKafkaConsumerState.STARTED;
        }
        catch (Exception e) {
            logger.error("Error subscribing", (Throwable)e);
            this.state = EventuateKafkaConsumerState.FAILED_TO_START;
            throw new RuntimeException(e);
        }
    }

    private void subscribe(KafkaMessageConsumer consumer) {
        logger.debug("Subscribing to {} {}", (Object)this.subscriberId, this.topics);
        consumer.subscribe(this.topics);
        logger.debug("Subscribed to {} {}", (Object)this.subscriberId, this.topics);
    }

    private void runPollingLoop(KafkaMessageConsumer consumer, KafkaMessageProcessor processor, BackPressureManager backPressureManager) {
        while (!this.stopFlag.get()) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.of(100L, ChronoUnit.MILLIS));
            if (!records.isEmpty()) {
                logger.debug("Got {} {} records", (Object)this.subscriberId, (Object)records.count());
            }
            if (records.isEmpty()) {
                processor.throwFailureException();
            } else {
                for (ConsumerRecord record : records) {
                    logger.debug("processing record subscriberId={} tpo=({} {} {}) body={}", new Object[]{this.subscriberId, record.topic(), record.partition(), record.offset(), record.value()});
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("EventuateKafkaAggregateSubscriptions subscriber = %s, offset = %d, key = %s, value = %s", this.subscriberId, record.offset(), record.key(), record.value()));
                    }
                    processor.process((ConsumerRecord<String, byte[]>)record);
                }
            }
            if (!records.isEmpty()) {
                logger.debug("Processed {} {} records", (Object)this.subscriberId, (Object)records.count());
            }
            try {
                this.maybeCommitOffsets(consumer, processor);
            }
            catch (Exception e) {
                logger.error("Cannot commit offsets", (Throwable)e);
                this.consumerCallbacks.ifPresent(ConsumerCallbacks::onCommitFailedCallback);
            }
            if (!records.isEmpty()) {
                logger.debug("To commit {} {}", (Object)this.subscriberId, (Object)processor.getPending());
            }
            int backlog = processor.backlog();
            HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
            for (ConsumerRecord record : records) {
                topicPartitions.add(new TopicPartition(record.topic(), record.partition()));
            }
            BackPressureActions actions = backPressureManager.update(topicPartitions, backlog);
            if (!actions.pause.isEmpty()) {
                logger.info("Subscriber {} pausing {} due to backlog {} > {}", new Object[]{this.subscriberId, actions.pause, backlog, this.backPressureConfig.getHigh()});
                consumer.pause(actions.pause);
            }
            if (actions.resume.isEmpty()) continue;
            logger.info("Subscriber {} resuming {} due to backlog {} <= {}", new Object[]{this.subscriberId, actions.resume, backlog, this.backPressureConfig.getLow()});
            consumer.resume(actions.resume);
        }
    }

    public void stop() {
        this.stopFlag.set(true);
    }

    public EventuateKafkaConsumerState getState() {
        return this.state;
    }
}

