/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.processor.ConsumerState;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.runtime.event.ApplicationShutdownEvent;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requires(bean=AdminClient.class)
class KafkaConsumerGroupManager
implements ApplicationEventListener<ApplicationShutdownEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerGroupManager.class);
    private final AdminClient adminClient;
    private final Map<String, ConsumerState> registerConsumerForGroupDeletion = new ConcurrentHashMap<String, ConsumerState>();
    private final List<String> uniqueGroupIdsDeleteOnShutdown = new ArrayList<String>();

    public KafkaConsumerGroupManager(AdminClient adminClient) {
        this.adminClient = adminClient;
    }

    public void onApplicationEvent(ApplicationShutdownEvent event) {
        LOG.info("Application shutdown initiated. Preparing to delete registered Kafka unique consumer groups.");
        if (!this.uniqueGroupIdsDeleteOnShutdown.isEmpty()) {
            LOG.info("Closing {} consumers and attempting to delete the following consumer groups: {}", (Object)this.uniqueGroupIdsDeleteOnShutdown.size(), this.uniqueGroupIdsDeleteOnShutdown);
            this.closeConsumers();
            this.adminClient.deleteConsumerGroups(this.uniqueGroupIdsDeleteOnShutdown).all().whenComplete((voidResult, throwable) -> {
                if (throwable == null) {
                    LOG.info("Successfully deleted the following consumer groups: {}", this.uniqueGroupIdsDeleteOnShutdown);
                } else {
                    LOG.warn("Failed to delete the following consumer groups: {}. Error: {}", new Object[]{this.uniqueGroupIdsDeleteOnShutdown, throwable.getMessage(), throwable});
                }
            });
        } else {
            LOG.info("No unique consumer groups are registered for deletion.");
        }
    }

    private void closeConsumers() {
        LOG.info("Closing all registered Kafka consumers who has unique group id.");
        this.registerConsumerForGroupDeletion.values().forEach(ConsumerState::wakeUp);
        this.registerConsumerForGroupDeletion.values().forEach(ConsumerState::close);
        this.registerConsumerForGroupDeletion.clear();
        LOG.info("All registered Kafka consumers who have unique group IDs have been successfully closed.");
    }

    void registerConsumerForGroupDeletion(String clientId, ConsumerState consumerState) {
        if (clientId != null && !clientId.isEmpty() && consumerState != null) {
            this.registerConsumerForGroupDeletion.put(clientId, consumerState);
            LOG.info("Registered consumer with client ID '{}' for group deletion on shutdown.", (Object)clientId);
        } else {
            LOG.warn("Failed to register consumer. Either client ID is null/empty or consumer state is null.");
        }
    }

    public void registerConsumerGroupIdForDeletion(String groupId) {
        if (groupId != null && !groupId.isEmpty()) {
            this.uniqueGroupIdsDeleteOnShutdown.add(groupId);
            LOG.info("Registered consumer group ID for deletion: {}", (Object)groupId);
        } else {
            LOG.warn("Attempted to register a null or empty consumer group ID for deletion");
        }
    }

    List<String> getRegisteredClientIdsForDeletion() {
        return this.registerConsumerForGroupDeletion.keySet().stream().toList();
    }
}

