/*
 * Decompiled with CFR 0.152.
 */
package hu.icellmobilsoft.coffee.module.redisstream.bootstrap;

import hu.icellmobilsoft.coffee.module.redisstream.annotation.RedisStreamConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.config.StreamGroupConfig;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamBaseConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamConsumerExecutor;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.RedisStreamConsumerExecutor;
import hu.icellmobilsoft.coffee.se.logging.Logger;
import hu.icellmobilsoft.coffee.tool.utils.annotation.AnnotationUtil;
import jakarta.annotation.Resource;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.util.Set;

public class BaseRedisConsumerStarter {
    @Inject
    private Logger log;
    @Resource
    private ManagedExecutorService managedExecutorService;
    @Inject
    private StreamGroupConfig config;
    @Inject
    private BeanManager beanManager;

    public void start() {
        Set beans = this.beanManager.getBeans(IRedisStreamBaseConsumer.class, new Annotation[]{RedisStreamConsumer.LITERAL});
        beans.forEach(this::handleConsumerBean);
    }

    protected void handleConsumerBean(Bean<?> bean) {
        this.log.info("Found consumer: [{0}]", new Object[]{bean.getBeanClass()});
        RedisStreamConsumer redisStreamConsumerAnnotation = (RedisStreamConsumer)AnnotationUtil.getAnnotation((Class)bean.getBeanClass(), RedisStreamConsumer.class);
        this.config.setConfigKey(redisStreamConsumerAnnotation.group());
        int threads = this.config.getConsumerThreadsCount().orElse(redisStreamConsumerAnnotation.consumerThreadsCount());
        Instance consumerExecutor = CDI.current().select(RedisStreamConsumerExecutor.class, new Annotation[0]);
        for (int i = 0; i < threads; ++i) {
            IRedisStreamConsumerExecutor executor = (IRedisStreamConsumerExecutor)consumerExecutor.get();
            this.startThread(executor, redisStreamConsumerAnnotation, bean);
        }
    }

    protected void startThread(IRedisStreamConsumerExecutor executor, RedisStreamConsumer redisStreamConsumerAnnotation, Bean<?> bean) {
        executor.init(redisStreamConsumerAnnotation.configKey(), redisStreamConsumerAnnotation.group(), bean);
        this.log.info("Starting Redis stream consumer with executor, class [{0}] for configKey [{1}], group [{2}]...", new Object[]{bean.getBeanClass(), redisStreamConsumerAnnotation.configKey(), redisStreamConsumerAnnotation.group()});
        this.managedExecutorService.submit((Runnable)executor);
        this.log.info("consumer class [{0}] started.", new Object[]{bean.getBeanClass()});
    }
}

