/*
 * Decompiled with CFR 0.152.
 */
package hu.icellmobilsoft.coffee.module.redisstream.consumer;

import hu.icellmobilsoft.coffee.dto.exception.BaseException;
import hu.icellmobilsoft.coffee.module.redis.annotation.RedisConnection;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManager;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManagerConnection;
import hu.icellmobilsoft.coffee.module.redisstream.annotation.RedisStreamConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.bootstrap.ConsumerLifeCycleManager;
import hu.icellmobilsoft.coffee.module.redisstream.config.StreamGroupConfig;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamBaseConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamConsumerExecutor;
import hu.icellmobilsoft.coffee.module.redisstream.consumer.IRedisStreamPipeConsumer;
import hu.icellmobilsoft.coffee.module.redisstream.service.RedisStreamService;
import hu.icellmobilsoft.coffee.se.logging.Logger;
import hu.icellmobilsoft.coffee.se.logging.mdc.MDC;
import hu.icellmobilsoft.coffee.tool.utils.annotation.AnnotationUtil;
import hu.icellmobilsoft.coffee.tool.utils.string.RandomUtil;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.jboss.weld.context.bound.BoundRequestContext;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.resps.StreamEntry;

@Dependent
public class RedisStreamConsumerExecutor
implements IRedisStreamConsumerExecutor {
    private static final String NOGROUP_PREFIX = "NOGROUP";
    @Inject
    private Logger log;
    @Inject
    private RedisStreamService redisStreamService;
    @Inject
    private BeanManager beanManager;
    @Inject
    private BoundRequestContext boundRequestContext;
    @Inject
    private StreamGroupConfig streamGroupConfig;
    private String consumerIdentifier;
    private String redisConfigKey;
    private Bean<? super IRedisStreamBaseConsumer> consumerBean;

    @Override
    public void init(String redisConfigKey, String group, Bean<? super IRedisStreamBaseConsumer> consumerBean) {
        this.redisConfigKey = redisConfigKey;
        this.consumerBean = consumerBean;
        this.redisStreamService.setGroup(group);
        this.streamGroupConfig.setConfigKey(group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startLoop() {
        ConsumerLifeCycleManager.CONSUMER_COUNTER.getAndIncrement();
        this.consumerIdentifier = RandomUtil.generateId();
        boolean prudentRun = true;
        while (!ConsumerLifeCycleManager.ENDLOOP) {
            Optional<Object> streamEntry = Optional.empty();
            Instance redisManagerInstance = CDI.current().select(RedisManager.class, new Annotation[]{new RedisConnection.Literal(this.redisConfigKey)});
            RedisManager redisManager = (RedisManager)redisManagerInstance.get();
            try {
                RedisManagerConnection ignore = redisManager.initConnection();
                try {
                    this.redisStreamService.setRedisManager(redisManager);
                    if (prudentRun) {
                        this.redisStreamService.handleGroup();
                        prudentRun = false;
                    }
                    streamEntry = this.redisStreamService.consumeOne(this.consumerIdentifier);
                    if (ConsumerLifeCycleManager.ENDLOOP) {
                        this.log.info("Skipping message processing because of shut down event.");
                        continue;
                    }
                    if (!streamEntry.isPresent()) continue;
                    StreamEntry entry = (StreamEntry)streamEntry.get();
                    this.handleMDC(entry);
                    this.consumeStreamEntry(entry, redisManager);
                }
                finally {
                    if (ignore == null) continue;
                    ignore.close();
                }
            }
            catch (BaseException e) {
                this.log.error(MessageFormat.format("Exception on consume streamEntry [{0}]: [{1}]", streamEntry, e.getLocalizedMessage()), (Throwable)e);
                Throwable cause = e.getCause();
                if (!(cause instanceof JedisDataException)) continue;
                String message = cause.getLocalizedMessage();
                if (StringUtils.startsWith((CharSequence)message, (CharSequence)NOGROUP_PREFIX)) {
                    this.log.error("Detected problem on redisConfigKey [{0}] with stream group [{1}] and activating prudentRun on next cycle. Exception: [{2}]", new Object[]{this.redisConfigKey, this.redisStreamService.getGroup(), message});
                    prudentRun = true;
                } else {
                    this.log.error(MessageFormat.format("Exception on redisConfigKey [{0}] with stream group [{1}]: [{2}]", this.redisConfigKey, this.redisStreamService.getGroup(), message), cause);
                }
                redisManager.closeConnection();
                this.sleep();
            }
            catch (Throwable e) {
                this.log.error(MessageFormat.format("Exception during consume on redisConfigKey [{0}] with stream group [{1}]: [{2}]", this.redisConfigKey, this.redisStreamService.getGroup(), e.getLocalizedMessage()), e);
                redisManager.closeConnection();
                this.sleep();
            }
            finally {
                this.cleanup((Instance<RedisManager>)redisManagerInstance, redisManager);
            }
        }
    }

    private void cleanup(Instance<RedisManager> redisManagerInstance, RedisManager redisManager) {
        try {
            if (redisManager != null) {
                redisManagerInstance.destroy((Object)redisManager);
            }
            MDC.clear();
        }
        catch (Throwable e) {
            this.log.error(MessageFormat.format("Exception during redisManager cleanup on redisConfigKey [{0}] with stream group [{1}]: [{2}]", this.redisConfigKey, this.redisStreamService.getGroup(), e.getLocalizedMessage()), e);
        }
    }

    protected void consumeStreamEntry(StreamEntry streamEntry, RedisManager redisManager) throws BaseException {
        Optional<Map<String, Object>> result = this.executeOnStream(streamEntry, 1);
        if (!this.streamGroupConfig.isManualAck()) {
            return;
        }
        this.ack(streamEntry.getID());
        this.afterAckInRequestScope(streamEntry, result.orElse(Collections.emptyMap()));
    }

    protected void ack(StreamEntryID streamEntryID) throws BaseException {
        this.redisStreamService.ackInCurrentConnection(streamEntryID);
    }

    protected Optional<Map<String, Object>> executeOnStream(StreamEntry streamEntry, int counter) throws BaseException {
        try {
            return this.onStreamInRequestScope(streamEntry);
        }
        catch (BaseException e) {
            RedisStreamConsumer redisStreamConsumerAnnotation = (RedisStreamConsumer)AnnotationUtil.getAnnotation((Class)this.consumerBean.getBeanClass(), RedisStreamConsumer.class);
            this.streamGroupConfig.setConfigKey(redisStreamConsumerAnnotation.group());
            int retryCount = this.streamGroupConfig.getRetryCount().orElse(redisStreamConsumerAnnotation.retryCount());
            if (counter < retryCount) {
                String msg = MessageFormat.format("Exception occured on running class [{0}], trying again [{1}]/[{2}]", this.consumerBean.getBeanClass(), counter + 1, retryCount);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(msg, (Throwable)e);
                } else {
                    String info = MessageFormat.format("{0}: [{1}], cause: [{2}]", msg, e.getLocalizedMessage(), Optional.ofNullable(e.getCause()).map(Throwable::getLocalizedMessage).orElse(null));
                    this.log.info(info);
                }
                return this.executeOnStream(streamEntry, counter + 1);
            }
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Optional<Map<String, Object>> onStreamInRequestScope(StreamEntry streamEntry) throws BaseException {
        Object consumer = this.beanManager.getReference(this.consumerBean, (Type)this.consumerBean.getBeanClass(), this.beanManager.createCreationalContext(this.consumerBean));
        ConcurrentHashMap<String, Object> requestScopeStore = null;
        try {
            requestScopeStore = new ConcurrentHashMap<String, Object>();
            this.startRequestScope(requestScopeStore);
            if (consumer instanceof IRedisStreamConsumer) {
                ((IRedisStreamConsumer)consumer).onStream(streamEntry);
            } else if (consumer instanceof IRedisStreamPipeConsumer) {
                Map<String, Object> result = ((IRedisStreamPipeConsumer)consumer).onStream(streamEntry);
                Optional<Map<String, Object>> optional = Optional.of(result);
                return optional;
            }
            Optional<Map<String, Object>> optional = Optional.empty();
            return optional;
        }
        finally {
            this.endRequestScope(requestScopeStore);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void afterAckInRequestScope(StreamEntry streamEntry, Map<String, Object> onStreamResult) throws BaseException {
        if (!this.consumerBean.getBeanClass().isAssignableFrom(IRedisStreamPipeConsumer.class)) {
            return;
        }
        Object consumer = this.beanManager.getReference(this.consumerBean, (Type)this.consumerBean.getBeanClass(), this.beanManager.createCreationalContext(this.consumerBean));
        ConcurrentHashMap<String, Object> requestScopeStore = null;
        try {
            requestScopeStore = new ConcurrentHashMap<String, Object>();
            this.startRequestScope(requestScopeStore);
            if (consumer instanceof IRedisStreamPipeConsumer) {
                ((IRedisStreamPipeConsumer)consumer).afterAck(streamEntry, onStreamResult);
            }
        }
        finally {
            this.endRequestScope(requestScopeStore);
        }
    }

    private void startRequestScope(Map<String, Object> requestScopeDataStore) {
        this.boundRequestContext.associate(requestScopeDataStore);
        this.boundRequestContext.activate();
    }

    private void endRequestScope(Map<String, Object> requestScopeDataStore) {
        try {
            this.boundRequestContext.invalidate();
            this.boundRequestContext.deactivate();
        }
        finally {
            if (requestScopeDataStore != null) {
                this.boundRequestContext.dissociate(requestScopeDataStore);
            }
        }
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(30L);
        }
        catch (InterruptedException ex) {
            this.log.warn("Interrupted sleep.", (Throwable)ex);
            try {
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                this.log.warn("Exception during interrupt.", (Throwable)ex);
            }
        }
    }

    protected void handleMDC(StreamEntry streamEntry) {
        Map fieldMap = streamEntry.getFields();
        String flowId = fieldMap.getOrDefault("extSessionId", (String)fieldMap.get("message"));
        MDC.put((String)"extSessionId", (String)flowId);
    }

    public String getConsumerIdentifier() {
        return this.consumerIdentifier;
    }

    @Override
    public void run() {
        try {
            this.startLoop();
        }
        finally {
            CDI.current().destroy((Object)this);
            if (ConsumerLifeCycleManager.CONSUMER_COUNTER.decrementAndGet() == 0) {
                ConsumerLifeCycleManager.SEMAPHORE.release();
            }
        }
    }

    public Bean<? super IRedisStreamBaseConsumer> getConsumerBean() {
        return this.consumerBean;
    }
}

