/*
 * Decompiled with CFR 0.152.
 */
package hu.icellmobilsoft.coffee.module.redisstream.bootstrap;

import hu.icellmobilsoft.coffee.dto.exception.BaseException;
import hu.icellmobilsoft.coffee.module.redisstream.annotation.RedisStreamConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.config.StreamGroupConfig;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamBaseConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamConsumerExecutor;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.RedisStreamConsumerExecutor;
import hu.icellmobilsoft.coffee.se.logging.Logger;
import hu.icellmobilsoft.coffee.tool.utils.annotation.AnnotationUtil;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.AfterDeploymentValidation;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.enterprise.inject.spi.Extension;
import java.lang.annotation.Annotation;
import java.util.Set;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ConsumerStarterExtension
implements Extension {
    private static final Logger LOG = Logger.getLogger(ConsumerStarterExtension.class);
    private ManagedExecutorService managedExecutorService;

    void validate(@Observes AfterDeploymentValidation adv, BeanManager beanManager) {
        LOG.debug("Checking consumer for RedisStreamConsumer...");
        Set beans = beanManager.getBeans(IRedisStreamBaseConsumer.class, new Annotation[]{RedisStreamConsumer.LITERAL});
        LOG.info("Found [{0}] RedisStreamConsumer bean...", new Object[]{beans.size()});
        if (!beans.isEmpty()) {
            Instance iconfig = beanManager.createInstance().select(StreamGroupConfig.class, new Annotation[0]);
            StreamGroupConfig config = (StreamGroupConfig)iconfig.get();
            this.initManagedExecutorService(adv);
            if (this.managedExecutorService == null) {
                return;
            }
            beans.stream().forEach(bean -> this.handleConsumerBean((Bean<?>)bean, config));
            iconfig.destroy((Object)config);
        }
    }

    private void initManagedExecutorService(AfterDeploymentValidation adv) {
        try {
            this.managedExecutorService = (ManagedExecutorService)InitialContext.doLookup("java:jboss/ee/concurrency/executor/default");
        }
        catch (NamingException e) {
            adv.addDeploymentProblem((Throwable)new BaseException("Can't get ManagedExecutorService", (Throwable)e));
        }
    }

    private void handleConsumerBean(Bean<?> bean, StreamGroupConfig config) {
        LOG.debug("Handling [{0}] bean", new Object[]{bean.getBeanClass()});
        RedisStreamConsumer redisStreamConsumerAnnotation = (RedisStreamConsumer)AnnotationUtil.getAnnotation((Class)bean.getBeanClass(), RedisStreamConsumer.class);
        config.setConfigKey(redisStreamConsumerAnnotation.group());
        int threads = config.getConsumerThreadsCount().orElse(redisStreamConsumerAnnotation.consumerThreadsCount());
        Instance consumerExecutor = CDI.current().select(RedisStreamConsumerExecutor.class, new Annotation[0]);
        for (int i = 0; i < threads; ++i) {
            IRedisStreamConsumerExecutor executor = (IRedisStreamConsumerExecutor)consumerExecutor.get();
            this.startThread(executor, redisStreamConsumerAnnotation, bean);
        }
    }

    private void startThread(IRedisStreamConsumerExecutor executor, RedisStreamConsumer redisStreamConsumerAnnotation, Bean<?> bean) {
        executor.init(redisStreamConsumerAnnotation.configKey(), redisStreamConsumerAnnotation.group(), bean);
        LOG.info("Starting Redis stream consumer with executor, class [{0}] for configKey [{1}], group [{2}]...", new Object[]{bean.getBeanClass(), redisStreamConsumerAnnotation.configKey(), redisStreamConsumerAnnotation.group()});
        this.managedExecutorService.submit((Runnable)executor);
        LOG.info("consumer class [{0}] started.", new Object[]{bean.getBeanClass()});
    }
}

