/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.processor.ConsumerCloseState;
import io.micronaut.configuration.kafka.processor.ConsumerInfo;
import io.micronaut.configuration.kafka.processor.FluxCallback;
import io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor;
import io.micronaut.configuration.kafka.processor.MonoCallback;
import io.micronaut.configuration.kafka.processor.PartitionRetryState;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@Internal
abstract class ConsumerState {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    protected final KafkaConsumerProcessor kafkaConsumerProcessor;
    protected final Object consumerBean;
    @Nullable
    protected final Map<TopicPartition, PartitionRetryState> topicPartitionRetries;
    protected final Map<Argument<?>, Object> boundArguments;
    protected boolean failed;
    final ConsumerInfo info;
    final Consumer<?, ?> kafkaConsumer;
    final Set<String> subscriptions;
    Set<TopicPartition> assignments;
    private Set<TopicPartition> pausedTopicPartitions;
    private Set<TopicPartition> pauseRequests;
    private boolean autoPaused;
    private boolean pollingStarted;
    private volatile ConsumerCloseState closedState;

    protected ConsumerState(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer<?, ?> consumer, Object consumerBean) {
        this.kafkaConsumerProcessor = kafkaConsumerProcessor;
        this.info = info;
        this.kafkaConsumer = consumer;
        this.consumerBean = consumerBean;
        this.subscriptions = Collections.unmodifiableSet(this.kafkaConsumer.subscription());
        this.autoPaused = !info.autoStartup;
        this.boundArguments = new HashMap(2);
        Optional.ofNullable(info.consumerArg).ifPresent(argument -> this.boundArguments.put((Argument<?>)argument, this.kafkaConsumer));
        this.closedState = ConsumerCloseState.NOT_STARTED;
        this.topicPartitionRetries = this.info.errorStrategy.isRetry() ? new HashMap() : null;
    }

    protected abstract ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> var1);

    protected abstract void processRecords(ConsumerRecords<?, ?> var1, Map<TopicPartition, OffsetAndMetadata> var2);

    @Nullable
    protected abstract Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets();

    void pause() {
        this.pause(this.assignments);
    }

    synchronized void pause(@NonNull Collection<TopicPartition> topicPartitions) {
        if (this.pauseRequests == null) {
            this.pauseRequests = new HashSet<TopicPartition>();
        }
        this.pauseRequests.addAll(topicPartitions);
    }

    synchronized void resume() {
        this.autoPaused = false;
        this.pauseRequests = null;
    }

    synchronized void resume(@NonNull Collection<TopicPartition> topicPartitions) {
        this.autoPaused = false;
        if (this.pauseRequests != null) {
            this.pauseRequests.removeAll(topicPartitions);
        }
    }

    synchronized boolean isPaused(@NonNull Collection<TopicPartition> topicPartitions) {
        if (this.pauseRequests == null || this.pausedTopicPartitions == null) {
            return false;
        }
        return this.pauseRequests.containsAll(topicPartitions) && this.pausedTopicPartitions.containsAll(topicPartitions);
    }

    void wakeUp() {
        this.kafkaConsumer.wakeup();
    }

    void close() {
        if (this.closedState == ConsumerCloseState.POLLING) {
            Instant start;
            Instant silentTime = start = Instant.now();
            do {
                Instant now;
                if (!LOG.isTraceEnabled() || !(now = Instant.now()).isAfter(silentTime)) continue;
                LOG.trace("Consumer {} is not closed yet (waiting {})", (Object)this.info.clientId, (Object)Duration.between(start, now));
                silentTime = now.plusSeconds(5L);
            } while (this.closedState == ConsumerCloseState.POLLING);
        }
        LOG.debug("Consumer {} is closed", (Object)this.info.clientId);
    }

    void threadPollLoop() {
        try {
            Consumer<?, ?> consumer = this.kafkaConsumer;
            try {
                while (true) {
                    this.refreshAssignmentsPollAndProcessRecords();
                }
            }
            catch (Throwable throwable) {
                if (consumer != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
        }
        catch (WakeupException e) {
            this.closedState = ConsumerCloseState.CLOSED;
            return;
        }
    }

    private void refreshAssignmentsPollAndProcessRecords() {
        this.refreshAssignments();
        try {
            this.pollAndProcessRecords();
        }
        catch (WakeupException e) {
            try {
                if (!this.failed && this.info.offsetStrategy != OffsetStrategy.DISABLED) {
                    this.kafkaConsumer.commitSync();
                }
            }
            catch (Exception ex) {
                LOG.warn("Error committing Kafka offsets on shutdown: {}", (Object)ex.getMessage(), (Object)ex);
            }
            throw e;
        }
        catch (Exception e) {
            this.handleException(e, null, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshAssignments() {
        Set newAssignments = this.kafkaConsumer.assignment();
        if (!newAssignments.equals(this.assignments)) {
            LOG.info("Consumer [{}] assignments changed: {} -> {}", new Object[]{this.info.clientId, this.assignments, newAssignments});
            this.assignments = Collections.unmodifiableSet(newAssignments);
        }
        ConsumerState consumerState = this;
        synchronized (consumerState) {
            if (this.autoPaused) {
                this.pause(this.assignments);
                this.kafkaConsumer.pause(this.assignments);
            }
        }
    }

    private void pollAndProcessRecords() {
        this.failed = true;
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = this.getCurrentOffsets();
        this.pauseTopicPartitions();
        ConsumerRecords<?, ?> consumerRecords = this.pollRecords(currentOffsets);
        this.closedState = ConsumerCloseState.POLLING;
        if (!this.pollingStarted) {
            this.pollingStarted = true;
            this.kafkaConsumerProcessor.publishStartedPollingEvent(this.kafkaConsumer);
        }
        this.resumeTopicPartitions();
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            return;
        }
        if (this.info.method.isSuspend()) {
            Argument lastArgument = this.info.method.getArguments()[this.info.method.getArguments().length - 1];
            this.boundArguments.put(lastArgument, null);
        }
        this.processRecords(consumerRecords, currentOffsets);
        if (this.failed) {
            return;
        }
        if (this.info.offsetStrategy == OffsetStrategy.SYNC) {
            try {
                this.kafkaConsumer.commitSync();
            }
            catch (CommitFailedException e) {
                this.handleException(e, consumerRecords, null);
            }
        } else if (this.info.offsetStrategy == OffsetStrategy.ASYNC) {
            this.kafkaConsumer.commitAsync(this.resolveCommitCallback());
        }
    }

    private synchronized void pauseTopicPartitions() {
        if (this.pauseRequests == null || this.pauseRequests.isEmpty()) {
            return;
        }
        HashSet<TopicPartition> validPauseRequests = new HashSet<TopicPartition>(this.pauseRequests);
        validPauseRequests.retainAll(this.assignments);
        if (validPauseRequests.isEmpty()) {
            return;
        }
        LOG.trace("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", (Object)this.info.clientId, validPauseRequests);
        this.kafkaConsumer.pause(validPauseRequests);
        LOG.debug("Paused Kafka consumption for Consumer [{}] from topic partition: {}", (Object)this.info.clientId, (Object)this.kafkaConsumer.paused());
        if (this.pausedTopicPartitions == null) {
            this.pausedTopicPartitions = new HashSet<TopicPartition>();
        }
        this.pausedTopicPartitions.addAll(validPauseRequests);
    }

    private synchronized void resumeTopicPartitions() {
        Set paused = this.kafkaConsumer.paused();
        if (paused.isEmpty()) {
            return;
        }
        List<TopicPartition> toResume = paused.stream().filter(topicPartition -> this.pauseRequests == null || !this.pauseRequests.contains(topicPartition)).toList();
        if (!toResume.isEmpty()) {
            LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", (Object)this.info.clientId, toResume);
            this.kafkaConsumer.resume(toResume);
        }
        if (this.pausedTopicPartitions != null) {
            toResume.forEach(this.pausedTopicPartitions::remove);
        }
    }

    protected void handleResultFlux(ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Flux<?> publisher, boolean isBlocking) {
        Flux recordMetadataProducer = publisher.flatMap(value -> this.sendToDestination(value, consumerRecord, consumerRecords));
        if (isBlocking) {
            List listRecords = (List)recordMetadataProducer.collectList().block();
            LOG.trace("Method [{}] produced record metadata: {}", this.info.method, (Object)listRecords);
        } else {
            recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", (Object)this.info.logMethod, recordMetadata));
        }
    }

    private Publisher<RecordMetadata> sendToDestination(Object value, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        if (value == null || this.info.sendToTopics.isEmpty()) {
            return Flux.empty();
        }
        Object key = consumerRecord.key();
        Object kafkaProducer = this.info.shouldSendOffsetsToTransaction ? this.kafkaConsumerProcessor.getTransactionalProducer(this.info.producerClientId, this.info.producerTransactionalId, byte[].class, Object.class) : this.kafkaConsumerProcessor.getProducer(Optional.ofNullable(this.info.producerClientId).orElse(this.info.groupId), key != null ? key.getClass() : byte[].class, value.getClass());
        Flux result = Flux.create(emitter -> this.sendToDestination((FluxSink<RecordMetadata>)emitter, (Producer<?, ?>)kafkaProducer, key, value, consumerRecord, consumerRecords));
        return result.onErrorResume(error -> this.handleSendToError((Throwable)error, consumerRecords, consumerRecord));
    }

    private void sendToDestination(FluxSink<RecordMetadata> emitter, Producer<?, ?> kafkaProducer, Object key, Object value, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        try {
            if (this.info.shouldSendOffsetsToTransaction) {
                this.beginTransaction(kafkaProducer);
            }
            this.sendToDestination(kafkaProducer, new FluxCallback(emitter), key, value, consumerRecord);
            if (this.info.shouldSendOffsetsToTransaction) {
                this.endTransaction(kafkaProducer, consumerRecords);
            }
            emitter.complete();
        }
        catch (Exception e) {
            if (this.info.shouldSendOffsetsToTransaction) {
                this.abortTransaction(kafkaProducer, e);
            }
            emitter.error((Throwable)e);
        }
    }

    private void sendToDestination(Producer<?, ?> kafkaProducer, Callback callback, Object key, Object value, ConsumerRecord<?, ?> consumerRecord) {
        for (String destinationTopic : this.info.sendToTopics) {
            if (this.info.returnsManyKafkaMessages) {
                Iterable messages = (Iterable)value;
                for (KafkaMessage message : messages) {
                    ProducerRecord producerRecord = ConsumerState.createFromMessage(destinationTopic, message);
                    kafkaProducer.send(producerRecord, callback);
                }
                continue;
            }
            ProducerRecord producerRecord = this.info.returnsOneKafkaMessage ? ConsumerState.createFromMessage(destinationTopic, (KafkaMessage)value) : new ProducerRecord(destinationTopic, null, key, value, (Iterable)consumerRecord.headers());
            LOG.trace("Sending record: {} for producer: {} {}", new Object[]{producerRecord, kafkaProducer, this.info.producerTransactionalId});
            kafkaProducer.send(producerRecord, callback);
        }
    }

    private void beginTransaction(Producer<?, ?> kafkaProducer) {
        try {
            LOG.trace("Beginning transaction for producer: {}", (Object)this.info.producerTransactionalId);
            kafkaProducer.beginTransaction();
        }
        catch (ProducerFencedException e) {
            this.kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
        }
    }

    private void endTransaction(Producer<?, ?> kafkaProducer, ConsumerRecords<?, ?> consumerRecords) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition partition : consumerRecords.partitions()) {
            List partitionedRecords = consumerRecords.records(partition);
            long offset = ((ConsumerRecord)partitionedRecords.get(partitionedRecords.size() - 1)).offset();
            offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1L));
        }
        this.sendOffsetsToTransaction(kafkaProducer, offsetsToCommit);
    }

    private void abortTransaction(Producer<?, ?> kafkaProducer, Exception e) {
        try {
            LOG.trace("Aborting transaction for producer: {} because of error: {}", (Object)this.info.producerTransactionalId, (Object)e.getMessage());
            kafkaProducer.abortTransaction();
        }
        catch (ProducerFencedException ex) {
            this.kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, ex);
        }
    }

    private void sendOffsetsToTransaction(Producer<?, ?> kafkaProducer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
        try {
            LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", new Object[]{offsetsToCommit, this.info.producerTransactionalId, this.info.groupId});
            kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(this.info.groupId));
            LOG.trace("Committing transaction for producer: {}", (Object)this.info.producerTransactionalId);
            kafkaProducer.commitTransaction();
            LOG.trace("Committed transaction for producer: {}", (Object)this.info.producerTransactionalId);
        }
        catch (ProducerFencedException e) {
            this.kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
        }
    }

    private Publisher<RecordMetadata> handleSendToError(Throwable error, ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord) {
        this.handleException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + this.info.method + "]: " + error.getMessage(), error, consumerRecords, consumerRecord);
        if (!this.info.shouldRedeliver) {
            return Flux.empty();
        }
        return this.redeliver(consumerRecord).doOnError(ex -> this.handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + this.info.method + "]: " + error.getMessage(), (Throwable)ex, consumerRecords, consumerRecord));
    }

    private Mono<RecordMetadata> redeliver(ConsumerRecord<?, ?> consumerRecord) {
        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        if (key == null || value == null) {
            return Mono.empty();
        }
        LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord);
        Producer<?, ?> kafkaProducer = this.kafkaConsumerProcessor.getProducer(Optional.ofNullable(this.info.producerClientId).orElse(this.info.groupId), key.getClass(), value.getClass());
        ProducerRecord producerRecord = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, (Iterable)consumerRecord.headers());
        LOG.trace("Sending record: {} for producer: {} {}", new Object[]{producerRecord, kafkaProducer, this.info.producerTransactionalId});
        return Mono.create(emitter -> kafkaProducer.send(producerRecord, (Callback)new MonoCallback((MonoSink<RecordMetadata>)emitter)));
    }

    protected void delayRetry(int currentRetryCount, Set<TopicPartition> partitions) {
        Duration retryDelay = this.info.errorStrategy.computeRetryDelay(this.info.retryDelay, currentRetryCount);
        if (retryDelay != null) {
            this.pause(partitions);
            this.kafkaConsumerProcessor.scheduleTask(retryDelay, () -> this.resume(partitions));
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected boolean shouldRetryException(Throwable e, ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord) {
        if (this.info.errorStrategy.isConditionalRetry()) {
            if (this.kafkaConsumerProcessor.shouldRetryMessage(this.consumerBean, this.wrapExceptionInKafkaListenerException(e.getMessage(), e, consumerRecords, consumerRecord))) return true;
            if (!this.info.exceptionTypes.stream().anyMatch(e.getClass()::equals)) return false;
            return true;
        }
        if (this.info.exceptionTypes.isEmpty()) return true;
        if (!this.info.exceptionTypes.stream().anyMatch(e.getClass()::equals)) return false;
        return true;
    }

    protected PartitionRetryState getPartitionRetryState(TopicPartition tp, long currentOffset) {
        PartitionRetryState retryState = this.topicPartitionRetries.computeIfAbsent(tp, x -> new PartitionRetryState());
        if (retryState.currentRetryOffset != currentOffset) {
            retryState.currentRetryOffset = currentOffset;
            retryState.currentRetryCount = 1;
        } else {
            ++retryState.currentRetryCount;
        }
        return retryState;
    }

    protected void handleException(Throwable e, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        this.handleException(e.getMessage(), e, consumerRecords, consumerRecord);
    }

    private void handleException(String message, Throwable e, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        this.kafkaConsumerProcessor.handleException(this.consumerBean, this.wrapExceptionInKafkaListenerException(message, e, consumerRecords, consumerRecord));
    }

    private KafkaListenerException wrapExceptionInKafkaListenerException(String message, Throwable e, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        return new KafkaListenerException(message, e, this.consumerBean, this.kafkaConsumer, consumerRecords, consumerRecord);
    }

    private OffsetCommitCallback resolveCommitCallback() {
        return (offsets, exception) -> {
            Object patt20719$temp = this.consumerBean;
            if (patt20719$temp instanceof OffsetCommitCallback) {
                OffsetCommitCallback occ = (OffsetCommitCallback)patt20719$temp;
                occ.onComplete(offsets, exception);
            } else if (exception != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{offsets, exception.getMessage(), exception});
            }
        };
    }

    private static ProducerRecord createFromMessage(String topic, KafkaMessage<?, ?> message) {
        return new ProducerRecord(Optional.ofNullable(message.getTopic()).orElse(topic), message.getPartition(), message.getTimestamp(), message.getKey(), message.getBody(), (Iterable)Optional.ofNullable(message.getHeaders()).map(ConsumerState::convertHeaders).orElse(null));
    }

    private static List<RecordHeader> convertHeaders(Map<String, Object> headers) {
        return headers.entrySet().stream().map(e -> new RecordHeader((String)e.getKey(), e.getValue().toString().getBytes(StandardCharsets.UTF_8))).toList();
    }
}

