/*
 * Decompiled with CFR 0.152.
 */
package org.mule.modules.kafka.internal.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.mule.modules.kafka.api.attributes.KafkaMessageAttributes;
import org.mule.modules.kafka.internal.error.exception.KafkaException;
import org.mule.modules.kafka.internal.error.exception.UnableToShutdownMuleConsumerException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MuleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MuleConsumer.class);
    private static final int POLLING_TIMEOUT_MS = 1000;
    private final int partitions;
    private ExecutorService executorService;
    private List<ConsumerGroupTask> consumerGroupTasks = new ArrayList<ConsumerGroupTask>();
    private List<Consumer<?, ?>> consumers;
    private Map<String, String> partitionOffsets;
    private String topic;
    private SourceCallback callback;

    public MuleConsumer(List<Consumer<?, ?>> consumers, ExecutorService executorService, int partitions) {
        this.consumers = consumers;
        this.executorService = executorService;
        this.partitions = partitions;
    }

    public void run(SourceCallback callback, String topic, Map<String, String> partitionOffsets) {
        this.partitionOffsets = partitionOffsets;
        this.topic = topic;
        this.callback = callback;
        this.subscribe(topic);
        this.launchListeningTasks();
    }

    private void subscribe(String topic) {
        for (Consumer<?, ?> consumer : this.consumers) {
            consumer.subscribe(Collections.singletonList(topic));
        }
    }

    private void launchListeningTasks() {
        int threadCount = this.partitions;
        logger.debug("Rising {} threads for processing messages.", (Object)threadCount);
        for (int i = 0; i < threadCount; ++i) {
            ConsumerGroupTask consumerGroupTask = new ConsumerGroupTask(this.callback, 1000L, this.consumers.get(i));
            this.executorService.execute(consumerGroupTask);
            this.consumerGroupTasks.add(consumerGroupTask);
        }
    }

    public void shutdown() {
        this.stopRunningTasks();
        this.shutdownExecutorService();
        this.closeConsumer();
    }

    private void shutdownExecutorService() {
        if (this.executorService != null) {
            logger.debug("Shutting down executor service.");
            this.executorService.shutdown();
            logger.debug("Executor service shutted down.");
            try {
                logger.debug("Awaiting termination of executor service.");
                if (!this.executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)) {
                    logger.warn("Thread pool did not terminate cleanly.");
                }
                logger.debug("All threads terminated.");
            }
            catch (Exception e) {
                logger.error("Thread pool did not terminate cleanly.", (Throwable)e);
                throw new UnableToShutdownMuleConsumerException("Unexpected exception while trying to shutdown.", e);
            }
        }
    }

    private void closeConsumer() {
        logger.debug("Closing Kafka consumers.");
        for (Consumer<?, ?> consumer : this.consumers) {
            consumer.close();
        }
        logger.debug("Kafka consumers closed.");
    }

    private void stopRunningTasks() {
        for (ConsumerGroupTask consumerGroupTask : this.consumerGroupTasks) {
            consumerGroupTask.stop();
        }
    }

    private Boolean seekOffset(Map<String, String> offsetsMap, Consumer<?, ?> consumer, String topic) {
        if (offsetsMap != null) {
            for (TopicPartition partitionInfo : consumer.assignment()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("[seekOffset] - Searching if any offset value is provided for partition {} in {} topic ", (Object)partitionInfo.partition(), (Object)topic);
                }
                Long offsetValue = null;
                String partitionStringValue = null;
                try {
                    partitionStringValue = String.valueOf(partitionInfo.partition());
                    if (offsetsMap.get(partitionStringValue) != null) {
                        offsetValue = Long.parseLong(offsetsMap.get(partitionStringValue));
                    }
                }
                catch (Exception e) {
                    logger.debug("[seekOffset] - Failed to parse offset value {} : {}", (Object)partitionStringValue, (Object)e);
                    return false;
                }
                if (offsetValue == null) continue;
                if (logger.isDebugEnabled()) {
                    logger.debug("[seekOffset] - Seek to offset {} in {} topic for partition {} ", new Object[]{offsetValue, topic, partitionInfo.partition()});
                }
                try {
                    consumer.seek(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), offsetValue.longValue());
                }
                catch (Exception e) {
                    logger.debug("[seekOffset] - Failed to seek position for {} topic on partition {} to {}", new Object[]{topic, partitionInfo.partition(), offsetValue});
                }
                return true;
            }
        }
        return false;
    }

    private class ConsumerGroupTask
    implements Runnable {
        private SourceCallback callback;
        private long pollingTimeoutMs;
        private volatile boolean running;
        private Consumer<?, ?> consumer;

        public ConsumerGroupTask(SourceCallback callback, long pollingTimeoutMs, Consumer consumer) {
            this.callback = callback;
            this.pollingTimeoutMs = pollingTimeoutMs;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            this.markAsRunning();
            this.messagesListenerLoop();
        }

        private void messagesListenerLoop() {
            ConsumerRecords<?, ?> consumerRecords = this.pollNextChunkOfMessages();
            if (!MuleConsumer.this.seekOffset(MuleConsumer.this.partitionOffsets, this.consumer, MuleConsumer.this.topic).booleanValue()) {
                this.processGottenRecords(consumerRecords);
            }
            while (this.running) {
                consumerRecords = this.pollNextChunkOfMessages();
                this.processGottenRecords(consumerRecords);
            }
            logger.debug("Task finished its execution.");
        }

        private void processGottenRecords(ConsumerRecords<?, ?> consumerRecords) {
            if (consumerRecords == null) {
                return;
            }
            for (ConsumerRecord consumerRecord : consumerRecords) {
                try {
                    logger.debug("Passing message with topic: {}, key: {}, offset: {}, partition: {} for processing. Thread: {}", new Object[]{consumerRecord.topic(), consumerRecord.key(), consumerRecord.offset(), consumerRecord.partition(), Thread.currentThread().getName()});
                    Result.Builder resultBuilder = Result.builder();
                    this.callback.handle(resultBuilder.output(consumerRecord.value()).attributes((Object)new KafkaMessageAttributes(consumerRecord.topic(), (String)consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset())).build());
                }
                catch (Exception e) {
                    logger.error("Unable to feed message into source callback...", (Throwable)e);
                    this.callback.onConnectionException(new ConnectionException((Throwable)e, (Object)"Unable to feed message into source callback"));
                }
            }
        }

        private ConsumerRecords<?, ?> pollNextChunkOfMessages() {
            ConsumerRecords consumedRecords = null;
            logger.debug("Polling for messages with a timeout of {}.", (Object)this.pollingTimeoutMs);
            try {
                consumedRecords = this.consumer.poll(this.pollingTimeoutMs);
                logger.debug("Got {} messages.", (Object)consumedRecords.count());
            }
            catch (AuthorizationException e) {
                logger.error("Consumer not authorized.", (Throwable)e);
                this.handlePollException((Exception)((Object)e));
            }
            catch (InvalidOffsetException e) {
                logger.error("Invalid offset is requested. Task won't be able to recover so it will be stopped.", (Throwable)e);
                this.handlePollException((Exception)((Object)e));
            }
            catch (WakeupException e) {
                logger.debug("Task was forcibly woken up.", (Throwable)e);
                this.handlePollException((Exception)((Object)e));
            }
            catch (KafkaException e) {
                logger.warn("Task got unknown exception from server. Task won't be able to recover so it will be stopped.", (Throwable)e);
                this.handlePollException(e);
            }
            return consumedRecords;
        }

        private void handlePollException(Exception e) {
            this.stop();
            this.callback.onConnectionException(new ConnectionException((Throwable)e));
        }

        private void markAsRunning() {
            this.running = true;
        }

        private void stop() {
            logger.debug("Stopping task...");
            this.running = false;
            this.consumer.wakeup();
        }
    }
}

