/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.kafka.internal.model.consumer;

import com.mulesoft.connectors.kafka.api.source.TopicPartition;
import com.mulesoft.connectors.kafka.internal.error.KafkaErrorType;
import com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidInputException;
import com.mulesoft.connectors.kafka.internal.error.exception.NoPollException;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.model.TopicPartitionDescription;
import com.mulesoft.connectors.kafka.internal.model.consumer.DefaultConsumerPool;
import com.mulesoft.connectors.kafka.internal.model.consumer.KafkaConsumerRecord;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.mule.runtime.core.api.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultMuleConsumer
implements MuleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMuleConsumer.class);
    private final UUID id = UUID.randomUUID();
    private final Properties properties;
    private final long maxPollTimeout;
    private long lastPollTime = 0L;
    private Consumer<InputStream, InputStream> consumer;
    private List<ConsumerRecord<InputStream, InputStream>> bufRecords = Collections.synchronizedList(new ArrayList());
    private List<ConsumerRecord<InputStream, InputStream>> inFlightRecords;
    private Semaphore commitSemaphore = new Semaphore(1);
    private DefaultConsumerPool pool;

    public DefaultMuleConsumer(Function<Properties, Consumer<InputStream, InputStream>> consumerFactory, Properties properties) {
        this.properties = properties;
        this.maxPollTimeout = Long.parseLong((String)properties.get("max.poll.interval.ms"));
        this.consumer = consumerFactory.apply(this.getProperties());
    }

    @Override
    public void setPool(DefaultConsumerPool pool) {
        this.pool = pool;
    }

    @Override
    public UUID getId() {
        return this.id;
    }

    @Override
    public void assign(List<TopicPartition> assignments) {
        this.consumer.unsubscribe();
        this.consumer.assign((Collection)assignments.stream().map(each -> new org.apache.kafka.common.TopicPartition(each.getTopic(), each.getPartition())).collect(Collectors.toList()));
    }

    @Override
    public void subscribe(List<String> topicPatterns) {
        this.consumer.unsubscribe();
        try {
            Pattern singlePattern = Pattern.compile(topicPatterns.stream().peek(eachTopicPattern -> logger.info("Subscribing to topic pattern '{}'.", eachTopicPattern)).map(Pattern::compile).map(Pattern::pattern).collect(Collectors.joining("|")));
            this.consumer.subscribe(singlePattern, new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> collection) {
                    logger.warn("Revoked topic-partitions: {}", collection);
                }

                public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> collection) {
                    logger.warn("Assigned topic-partitions: {}", collection);
                }
            });
        }
        catch (PatternSyntaxException e) {
            throw new InvalidInputException("The subscription patterns are invalid", e);
        }
    }

    @Override
    public Set<TopicPartition> assignment() {
        Set assignments = this.consumer.assignment();
        return assignments.stream().map(topicPartition -> new TopicPartition(topicPartition.topic(), topicPartition.partition())).peek(description -> logger.trace("Found description: {}", description)).collect(Collectors.toSet());
    }

    @Override
    public Set<TopicPartitionDescription> assignmentDescription() {
        return this.consumer.assignment().stream().map(topicPartition -> new TopicPartitionDescription(topicPartition.topic(), topicPartition.partition(), this.consumer.position(topicPartition), (Long)this.consumer.beginningOffsets(Collections.singletonList(topicPartition)).get(topicPartition), (Long)this.consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition))).peek(description -> logger.trace("Found description: {}", description)).collect(Collectors.toSet());
    }

    @Override
    public void seek(String topic, int partition, long offset) {
        this.consumer.seek(new org.apache.kafka.common.TopicPartition(topic, partition), offset);
        List remainingRecords = this.bufRecords.stream().filter(record -> !record.topic().equals(topic) || record.partition() != partition).collect(Collectors.toList());
        this.bufRecords.clear();
        this.bufRecords.addAll(remainingRecords);
    }

    @Override
    public List<ConsumerRecord<InputStream, InputStream>> poll(Duration timeout) {
        this.inFlightRecords = this.getRecords(timeout).peek(this::logRecord).collect(Collectors.toList());
        return !this.inFlightRecords.isEmpty() ? this.inFlightRecords : null;
    }

    @Override
    public ConsumerRecord<InputStream, InputStream> singleElementPoll(Duration timeout) {
        this.inFlightRecords = Stream.of(this.getRecords(timeout).findFirst().orElseThrow(NotFoundException::new)).peek(this::logRecord).collect(Collectors.toList());
        return !this.inFlightRecords.isEmpty() ? this.inFlightRecords.get(0) : null;
    }

    private void throwIfMultipleCommits(boolean throwException) {
        if (throwException) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to commit because the commit operation is already made by another thread.");
            }
            throw new CommitFailedException("Failed to commit because the commit operation is already made by another thread.", KafkaErrorType.ALREADY_COMMITED);
        }
    }

    @Override
    public void commit() {
        try {
            this.throwIfMultipleCommits(!this.commitSemaphore.tryAcquire());
            HashMap commits = new HashMap();
            this.validatePreviousPollReturnedResults();
            this.inFlightRecords.forEach(record -> commits.put(new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)));
            this.consumer.commitSync(commits);
            this.throwIfMultipleCommits(!this.bufRecords.removeAll(this.inFlightRecords));
        }
        catch (KafkaException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("Commit failed, marking pool as invalid. Exception was {}.", (Object)e.getMessage());
            }
            this.bufRecords.clear();
            this.inFlightRecords.clear();
            this.pool.invalidate();
            throw e;
        }
        finally {
            this.commitSemaphore.release();
        }
    }

    @Override
    public void asyncCommit() {
        HashMap<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> commits = new HashMap<org.apache.kafka.common.TopicPartition, OffsetAndMetadata>();
        this.validatePreviousPollReturnedResults();
        this.inFlightRecords.forEach(record -> commits.put(new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)));
        this.commitAsync(commits);
        this.bufRecords.removeAll(this.inFlightRecords);
    }

    private void commitAsync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> commits) {
        this.consumer.commitAsync(commits, (offsets, exception) -> {
            if (exception != null) {
                if (exception instanceof RetriableException) {
                    logger.debug("A retriable exception happened during the execution. The next execution will re-try committing the processed messages.");
                } else {
                    logger.error("An unrecoverable exception happened during the asyncCommit.");
                    this.bufRecords.clear();
                    this.inFlightRecords.clear();
                    this.pool.invalidate();
                }
            }
        });
    }

    @Override
    public void resetBuffer() {
        HashMap resetPartitionsMap = new HashMap();
        ArrayList alreadyCommitedMessages = new ArrayList();
        this.inFlightRecords.forEach(record -> {
            if (this.bufRecords.contains(record)) {
                Map topicPartitionsMap = (Map)resetPartitionsMap.get(record.topic());
                if (topicPartitionsMap == null) {
                    HashMap<Integer, Long> valueMap = new HashMap<Integer, Long>();
                    valueMap.put(record.partition(), record.offset());
                    resetPartitionsMap.put(record.topic(), valueMap);
                } else if (topicPartitionsMap.get(record.partition()) == null || (Long)topicPartitionsMap.get(record.partition()) > record.offset()) {
                    topicPartitionsMap.put(record.partition(), record.offset());
                }
            } else {
                alreadyCommitedMessages.add(record);
            }
        });
        this.inFlightRecords.removeAll(alreadyCommitedMessages);
        resetPartitionsMap.entrySet().forEach(entry -> ((Map)entry.getValue()).entrySet().forEach(partitionEntry -> this.seek((String)entry.getKey(), (Integer)partitionEntry.getKey(), (Long)partitionEntry.getValue())));
    }

    @Override
    public void close() {
        this.consumer.close();
    }

    private Stream<ConsumerRecord<InputStream, InputStream>> getRecords(Duration timeout) {
        boolean isMaxPollOverdue;
        boolean bl = isMaxPollOverdue = this.maxPollTimeout < System.currentTimeMillis() - this.lastPollTime;
        if (isMaxPollOverdue || this.bufRecords.isEmpty()) {
            this.bufRecords.clear();
            logger.trace("Buffer empty. Retrieving records.");
            this.consumer.poll(timeout).forEach(message -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding message from topic {} partition {} with offset {} to buffer.", new Object[]{message.topic(), message.partition(), message.offset()});
                }
                this.bufRecords.add(new KafkaConsumerRecord(message));
            });
            this.lastPollTime = System.currentTimeMillis();
            logger.trace("Retrieved {} records.", (Object)this.bufRecords.size());
            if (this.bufRecords.isEmpty()) {
                throw new NotFoundException("No message was found when executing the poll on KafkaConsumer");
            }
        }
        return this.bufRecords.stream();
    }

    protected void validatePreviousPollReturnedResults() {
        if (this.inFlightRecords == null || this.inFlightRecords.isEmpty()) {
            throw new NoPollException("There is no previous poll to commit");
        }
    }

    private void logRecord(ConsumerRecord<InputStream, InputStream> record) {
        if (logger.isTraceEnabled()) {
            logger.trace("Message key: {}", (Object)Optional.ofNullable(record.key()).map(IOUtils::toString).orElse(" no key"));
            logger.trace("Message value: {}", (Object)Optional.ofNullable(record.value()).map(IOUtils::toString).orElse(" no value"));
            logger.trace("Message topic: {}", (Object)Optional.ofNullable(record.topic()).orElse(" no topic"));
            logger.trace("Message partition: {}", (Object)Optional.ofNullable(record.partition()).map(String::valueOf).orElse(" no partition"));
            logger.trace("Message offset: {}", (Object)Optional.ofNullable(record.offset()).map(String::valueOf).orElse(" no offset"));
            Optional.ofNullable(record.headers()).orElseGet(ArrayList::new).forEach(header -> logger.trace("Message Header: '{}': '{}'", (Object)header.key(), (Object)new String(Optional.ofNullable(header.value()).orElseGet(""::getBytes))));
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        DefaultMuleConsumer that = (DefaultMuleConsumer)o;
        return Objects.equals(this.id, that.id);
    }

    public int hashCode() {
        return Objects.hash(this.id);
    }

    private Properties getProperties() {
        return this.properties;
    }
}

