/*
 * Decompiled with CFR 0.152.
 */
package hu.icellmobilsoft.coffee.module.redisstream.service;

import hu.icellmobilsoft.coffee.dto.exception.BaseException;
import hu.icellmobilsoft.coffee.dto.exception.InvalidParameterException;
import hu.icellmobilsoft.coffee.dto.exception.TechnicalException;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManager;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManagerConnection;
import hu.icellmobilsoft.coffee.module.redisstream.common.RedisStreamUtil;
import hu.icellmobilsoft.coffee.module.redisstream.config.StreamGroupConfig;
import hu.icellmobilsoft.coffee.se.logging.Logger;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamGroupInfo;
import redis.clients.jedis.resps.StreamPendingEntry;

@Dependent
public class RedisStreamService {
    private static final int EXPIRED_MESSAGE_CLEANUP_BLOCK_SIZE = 1000;
    @Inject
    private Logger log;
    @Inject
    private StreamGroupConfig config;
    private RedisManager redisManager;
    private String group;

    public String streamKey() {
        return RedisStreamUtil.streamKey(this.getGroup());
    }

    public boolean isRedisStreamEnabled() {
        return this.config.isEnabled();
    }

    public Long count() throws BaseException {
        Long count = this.getRedisManager().runWithConnection(Jedis::xlen, "xlen", (Object)this.streamKey()).orElse(0L);
        if (this.log.isTraceEnabled()) {
            this.log.trace("[{0}] stream have [{1}] elements", new Object[]{this.streamKey(), count});
        }
        return count;
    }

    public boolean existGroup() throws BaseException {
        try (RedisManagerConnection ignored = this.getRedisManager().initConnection();){
            boolean bl = this.existsGroupInActiveConnection();
            return bl;
        }
    }

    public void handleGroup() throws BaseException {
        if (this.existsGroupInActiveConnection()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Group [{0}] already exist", new Object[]{this.getGroup()});
            }
        } else {
            Optional createGroupResult = this.getRedisManager().run(Jedis::xgroupCreate, "xgroupCreate", (Object)this.streamKey(), (Object)this.getGroup(), (Object)new StreamEntryID(), (Object)true);
            this.log.info("Stream group [{0}] on stream [{1}] created with result: [{2}]", new Object[]{this.getGroup(), this.streamKey(), createGroupResult});
        }
    }

    private boolean existsGroupInActiveConnection() throws BaseException {
        try {
            Optional info = this.getRedisManager().run(Jedis::xinfoGroups, "xinfoGroups", (Object)this.streamKey());
            return info.isPresent() && ((List)info.get()).stream().map(StreamGroupInfo::getName).anyMatch(name -> StringUtils.equals((CharSequence)this.getGroup(), (CharSequence)name));
        }
        catch (TechnicalException e) {
            if (!(e.getCause() instanceof JedisDataException)) {
                throw e;
            }
            this.log.info("Redis exception during checking group [{0}]: [{1}]", new Object[]{this.streamKey(), e.getLocalizedMessage()});
            return false;
        }
    }

    public Optional<StreamEntry> consumeOne(String consumerIdentifier) throws BaseException {
        if (StringUtils.isBlank((CharSequence)consumerIdentifier)) {
            throw new InvalidParameterException("consumerIdentifier is null");
        }
        Map<String, StreamEntryID> streamQuery = Map.of(this.streamKey(), StreamEntryID.UNRECEIVED_ENTRY);
        Optional result = this.getRedisManager().run(Jedis::xreadGroup, "xreadGroup", (Object)this.getGroup(), (Object)consumerIdentifier, (Object)this.createXReadGroupParams(), streamQuery);
        if (result.isEmpty() || ((List)result.get()).isEmpty()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("No new message in [{0}] stream", new Object[]{this.streamKey()});
            }
            return Optional.empty();
        }
        Map.Entry stream = (Map.Entry)((List)result.get()).get(0);
        if (stream.getValue() == null || ((List)stream.getValue()).isEmpty()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Stream key [{0}] in stream [{1}] no have values stream", new Object[]{stream.getKey(), this.streamKey()});
            }
            return Optional.empty();
        }
        StreamEntry entry = (StreamEntry)((List)stream.getValue()).get(0);
        if (this.log.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder("Consumed one entry from:");
            sb.append("\nStream key [" + (String)stream.getKey() + "] ");
            sb.append("\n  ID: [" + entry.getID() + "], values: [");
            entry.getFields().forEach((key, value) -> sb.append("\n    Key[" + key + "]: Value[" + value + "]"));
            sb.append("\n]");
            this.log.trace(sb.toString());
        }
        return Optional.of(entry);
    }

    protected XReadGroupParams createXReadGroupParams() {
        int readTimeoutMillis = this.config.getStreamReadTimeoutMillis().intValue();
        XReadGroupParams params = new XReadGroupParams().count(1).block(readTimeoutMillis);
        if (!this.config.isManualAck()) {
            params.noAck();
        }
        return params;
    }

    public long ack(StreamEntryID streamEntryID) throws BaseException {
        if (Objects.isNull(streamEntryID)) {
            return 0L;
        }
        try (RedisManagerConnection ignored = this.getRedisManager().initConnection();){
            long l = this.ackInCurrentConnection(streamEntryID);
            return l;
        }
    }

    public long ackInCurrentConnection(StreamEntryID streamEntryID) throws BaseException {
        if (Objects.isNull(streamEntryID)) {
            return 0L;
        }
        long successCount = this.getRedisManager().run((rec$, x$0, x$1, xva$2) -> ((Jedis)rec$).xack(x$0, x$1, new StreamEntryID[]{xva$2}), "xack", (Object)this.streamKey(), (Object)this.getGroup(), (Object)streamEntryID).orElse(0L);
        if (this.log.isTraceEnabled()) {
            if (successCount > 0L) {
                this.log.trace("StreamEntryID [{0}] successfully ACKed", new Object[]{streamEntryID});
            } else {
                this.log.trace("StreamEntryID [{0}] not ACKed", new Object[]{streamEntryID});
            }
        }
        return successCount;
    }

    public Optional<List<StreamPendingEntry>> pending(int pendingCount, StreamEntryID from, StreamEntryID to) throws BaseException {
        try (RedisManagerConnection ignored = this.getRedisManager().initConnection();){
            Optional<List<StreamPendingEntry>> optional = this.pendingInCurrentConnection(pendingCount, from, to);
            return optional;
        }
    }

    private Optional<List<StreamPendingEntry>> pendingInCurrentConnection(int pendingCount, StreamEntryID from, StreamEntryID to) throws BaseException {
        XPendingParams params = new XPendingParams(from, to, pendingCount);
        return this.getRedisManager().run(Jedis::xpending, "xpending", (Object)this.streamKey(), (Object)this.getGroup(), (Object)params);
    }

    public Optional<List<StreamPendingEntry>> pendingExpired(int pendingCount, Duration expiryDuration) throws BaseException {
        if (expiryDuration == null) {
            return Optional.empty();
        }
        return this.pending(pendingCount, null, LocalDateTime.now().minus(expiryDuration));
    }

    public long removeExpiredPendingEntries(Duration expiryDuration) throws BaseException {
        if (expiryDuration == null) {
            return 0L;
        }
        long removedEntries = 0L;
        try (RedisManagerConnection ignored = this.getRedisManager().initConnection();){
            List pendingEntries;
            boolean again;
            do {
                pendingEntries = this.pendingInCurrentConnection(1000, null, RedisStreamService.toStreamEntryID(LocalDateTime.now().minus(expiryDuration))).orElseGet(List::of);
                for (StreamPendingEntry pendingEntry : pendingEntries) {
                    StreamEntryID id = pendingEntry.getID();
                    this.ackInCurrentConnection(id);
                }
                removedEntries += (long)pendingEntries.size();
            } while (again = pendingEntries.size() >= 1000);
        }
        return removedEntries;
    }

    public Optional<List<StreamPendingEntry>> pending(int pendingCount, LocalDateTime from, LocalDateTime to) throws BaseException {
        return this.pending(pendingCount, RedisStreamService.toStreamEntryID(from), RedisStreamService.toStreamEntryID(to));
    }

    public Optional<List<StreamPendingEntry>> pending(int pendingCount) throws BaseException {
        return this.pending(pendingCount, (StreamEntryID)null, (StreamEntryID)null);
    }

    public static StreamEntryID toStreamEntryID(LocalDateTime localDateTime) {
        if (localDateTime == null) {
            return null;
        }
        return new StreamEntryID(localDateTime.toEpochSecond(ZoneOffset.UTC), 0L);
    }

    public RedisManager getRedisManager() {
        return this.redisManager;
    }

    public void setRedisManager(RedisManager redisManager) {
        this.redisManager = redisManager;
    }

    public String getGroup() {
        return this.group;
    }

    public void setGroup(String group) {
        this.group = group;
        this.config.setConfigKey(group);
    }

    public static Instance<RedisStreamService> instance() {
        return CDI.current().select(RedisStreamService.class, new Annotation[0]);
    }
}

