/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.dyno.queues.redis.QueueUtils;
import com.netflix.dyno.queues.redis.sharding.ShardingStrategy;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.text.NumberFormat;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.ZAddParams;

public class RedisDynoQueue
implements DynoQueue {
    private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class);
    private final Clock clock;
    private final String queueName;
    private final List<String> allShards;
    private final String shardName;
    private final String redisKeyPrefix;
    private final String messageStoreKey;
    private final String localQueueShard;
    private volatile int unackTime = 60;
    private final QueueMonitor monitor;
    private final ObjectMapper om;
    private volatile JedisCommands quorumConn;
    private volatile JedisCommands nonQuorumConn;
    private final ConcurrentLinkedQueue<String> prefetchedIds;
    private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;
    private final ScheduledExecutorService schedulerForUnacksProcessing;
    private final int retryCount = 2;
    private final ShardingStrategy shardingStrategy;
    private final boolean singleRingTopology;
    @VisibleForTesting
    AtomicInteger numIdsToPrefetch;
    @VisibleForTesting
    AtomicInteger unsafeNumIdsToPrefetchAllShards;

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
        this(redisKeyPrefix, queueName, allShards, shardName, 60000, shardingStrategy, singleRingTopology);
    }

    public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
        this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy, singleRingTopology);
    }

    public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
        this.clock = clock;
        this.redisKeyPrefix = redisKeyPrefix;
        this.queueName = queueName;
        this.allShards = ImmutableList.copyOf((Collection)allShards.stream().collect(Collectors.toList()));
        this.shardName = shardName;
        this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
        this.localQueueShard = this.getQueueShardKey(queueName, shardName);
        this.shardingStrategy = shardingStrategy;
        this.numIdsToPrefetch = new AtomicInteger(0);
        this.unsafeNumIdsToPrefetchAllShards = new AtomicInteger(0);
        this.singleRingTopology = singleRingTopology;
        this.om = QueueUtils.constructObjectMapper();
        this.monitor = new QueueMonitor(queueName, shardName);
        this.prefetchedIds = new ConcurrentLinkedQueue();
        this.unsafePrefetchedIdsAllShardsMap = new HashMap<String, ConcurrentLinkedQueue<String>>();
        for (String shard : allShards) {
            this.unsafePrefetchedIdsAllShardsMap.put(this.getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue());
        }
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        if (this.singleRingTopology) {
            this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        } else {
            this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        }
        this.logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName);
    }

    public RedisDynoQueue withQuorumConn(JedisCommands quorumConn) {
        this.quorumConn = quorumConn;
        return this;
    }

    public RedisDynoQueue withNonQuorumConn(JedisCommands nonQuorumConn) {
        this.nonQuorumConn = nonQuorumConn;
        return this;
    }

    public RedisDynoQueue withUnackTime(int unackTime) {
        this.unackTime = unackTime;
        return this;
    }

    private int unsafeGetNumPrefetchedIds() {
        AtomicInteger totalSize = new AtomicInteger(0);
        this.unsafePrefetchedIdsAllShardsMap.forEach((k, v) -> totalSize.addAndGet(v.size()));
        return totalSize.get();
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<String> push(List<Message> messages) {
        Stopwatch sw = this.monitor.start(this.monitor.push, messages.size());
        try {
            QueueUtils.execute("push", "(a shard in) " + this.queueName, () -> {
                for (Message message : messages) {
                    String json = this.om.writeValueAsString((Object)message);
                    this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                    double priority = (double)message.getPriority() / 100.0;
                    double score = Long.valueOf(this.clock.millis() + message.getTimeout()).doubleValue() + priority;
                    String shard = this.shardingStrategy.getNextShard(this.allShards, message);
                    String queueShard = this.getQueueShardKey(this.queueName, shard);
                    this.quorumConn.zadd(queueShard, score, message.getId());
                }
                return messages;
            });
            List<String> list = messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
            return list;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Message> peek(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        try {
            Set<String> ids = this.peekIds(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            List<Message> list = this.doPeekBodyHelper(ids);
            return list;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Message> unsafePeekAllShards(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        try {
            Set<String> ids = this.peekIdsAllShards(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            List<Message> list = this.doPeekBodyHelper(ids);
            return list;
        }
        finally {
            sw.stop();
        }
    }

    private Set<String> peekIds(int offset, int count, double peekTillTs) {
        return QueueUtils.execute("peekIds", this.localQueueShard, () -> {
            double peekTillTsOrNow = peekTillTs == 0.0 ? Long.valueOf(this.clock.millis() + 1L).doubleValue() : peekTillTs;
            return this.doPeekIdsFromShardHelper(this.localQueueShard, peekTillTsOrNow, offset, count);
        });
    }

    private Set<String> peekIds(int offset, int count) {
        return this.peekIds(offset, count, 0.0);
    }

    private Set<String> peekIdsAllShards(int offset, int count) {
        return QueueUtils.execute("peekIdsAllShards", this.localQueueShard, () -> {
            HashSet<String> scanned = new HashSet<String>();
            double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
            int remaining_count = count;
            scanned.addAll(this.peekIds(offset, count, now));
            remaining_count -= scanned.size();
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                if (queueShardName.equals(this.localQueueShard)) continue;
                Set<String> elems = this.doPeekIdsFromShardHelper(queueShardName, now, offset, count);
                scanned.addAll(elems);
                if ((remaining_count -= elems.size()) > 0) continue;
                break;
            }
            return scanned;
        });
    }

    private Set<String> doPeekIdsFromShardHelper(String queueShardName, double peekTillTs, int offset, int count) {
        return this.nonQuorumConn.zrangeByScore(queueShardName, 0.0, peekTillTs, offset, count);
    }

    private List<Message> doPeekBodyHelper(Set<String> message_ids) {
        List msgs = QueueUtils.execute("peek", this.messageStoreKey, () -> {
            LinkedList<Message> messages = new LinkedList<Message>();
            for (String id : message_ids) {
                String json = this.nonQuorumConn.hget(this.messageStoreKey, id);
                Message message = (Message)this.om.readValue(json, Message.class);
                messages.add(message);
            }
            return messages;
        });
        return msgs;
    }

    public List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.numIdsToPrefetch.addAndGet(messageCount);
            this.prefetchIds();
            while (this.prefetchedIds.size() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIds();
            }
            List<Message> list = this._pop(this.shardName, messageCount, this.prefetchedIds);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    public Message popWithMsgId(String messageId) {
        return this.popWithMsgIdHelper(messageId, this.shardName, true);
    }

    public Message unsafePopWithMsgIdAllShards(String messageId) {
        int numShards = this.allShards.size();
        for (String shard : this.allShards) {
            Message msg;
            boolean warnIfNotExists = false;
            if (--numShards == 0) {
                warnIfNotExists = true;
            }
            if ((msg = this.popWithMsgIdHelper(messageId, shard, warnIfNotExists)) == null) continue;
            return msg;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {
        Stopwatch sw = this.monitor.start(this.monitor.pop, 1);
        try {
            Message message = QueueUtils.execute("popWithMsgId", targetShard, () -> {
                String queueShardName = this.getQueueShardKey(this.queueName, targetShard);
                double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
                String unackShardName = this.getUnackKey(this.queueName, targetShard);
                ZAddParams zParams = ZAddParams.zAddParams().nx();
                Long exists = this.nonQuorumConn.zrank(queueShardName, messageId);
                if (exists == null) {
                    if (warnIfNotExists) {
                        this.logger.warn("Cannot find the message with ID {}", (Object)messageId);
                    }
                    this.monitor.misses.increment();
                    return null;
                }
                String json = this.quorumConn.hget(this.messageStoreKey, messageId);
                if (json == null) {
                    this.logger.warn("Cannot get the message payload for {}", (Object)messageId);
                    this.monitor.misses.increment();
                    return null;
                }
                long added = this.quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
                if (added == 0L) {
                    this.logger.warn("cannot add {} to the unack shard {}", (Object)messageId, (Object)unackShardName);
                    this.monitor.misses.increment();
                    return null;
                }
                long removed = this.quorumConn.zrem(queueShardName, new String[]{messageId});
                if (removed == 0L) {
                    this.logger.warn("cannot remove {} from the queue shard ", (Object)this.queueName, (Object)messageId);
                    this.monitor.misses.increment();
                    return null;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                return msg;
            });
            return message;
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> unsafePopAllShards(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.unsafeNumIdsToPrefetchAllShards.addAndGet(messageCount);
            this.prefetchIdsAllShards();
            while (this.unsafeGetNumPrefetchedIds() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIdsAllShards();
            }
            int remainingCount = messageCount;
            List<Message> popped = this._pop(this.shardName, remainingCount, this.unsafePrefetchedIdsAllShardsMap.get(this.localQueueShard));
            remainingCount -= popped.size();
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                List<Message> elems = this._pop(shard, remainingCount, this.unsafePrefetchedIdsAllShardsMap.get(queueShardName));
                popped.addAll(elems);
                remainingCount -= elems.size();
            }
            List<Message> list = popped;
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    private void prefetchIds() {
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        int numPrefetched = this.doPrefetchIdsHelper(this.localQueueShard, this.numIdsToPrefetch, this.prefetchedIds, now);
        if (numPrefetched == 0) {
            this.numIdsToPrefetch.set(0);
        }
    }

    private void prefetchIdsAllShards() {
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        this.doPrefetchIdsHelper(this.localQueueShard, this.unsafeNumIdsToPrefetchAllShards, this.unsafePrefetchedIdsAllShardsMap.get(this.localQueueShard), now);
        if (this.unsafeNumIdsToPrefetchAllShards.get() < 1) {
            return;
        }
        for (String shard : this.allShards) {
            String queueShardName = this.getQueueShardKey(this.queueName, shard);
            if (queueShardName.equals(this.localQueueShard)) continue;
            this.doPrefetchIdsHelper(queueShardName, this.unsafeNumIdsToPrefetchAllShards, this.unsafePrefetchedIdsAllShardsMap.get(queueShardName), now);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int doPrefetchIdsHelper(String queueShardName, AtomicInteger prefetchCounter, ConcurrentLinkedQueue<String> prefetchedIdQueue, double prefetchFromTs) {
        if (prefetchCounter.get() < 1) {
            return 0;
        }
        int numSuccessfullyPrefetched = 0;
        int numToPrefetch = prefetchCounter.get();
        Stopwatch sw = this.monitor.start(this.monitor.prefetch, numToPrefetch);
        try {
            Set<String> ids = this.doPeekIdsFromShardHelper(queueShardName, prefetchFromTs, 0, numToPrefetch);
            prefetchedIdQueue.addAll(ids);
            numSuccessfullyPrefetched = ids.size();
            prefetchCounter.addAndGet(-1 * ids.size());
            if (prefetchCounter.get() < 0) {
                prefetchCounter.set(0);
            }
        }
        finally {
            sw.stop();
        }
        return numSuccessfullyPrefetched;
    }

    private List<Message> _pop(String shard, int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception {
        String msgId;
        String queueShardName = this.getQueueShardKey(this.queueName, shard);
        String unackShardName = this.getUnackKey(this.queueName, shard);
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        ZAddParams zParams = ZAddParams.zAddParams().nx();
        LinkedList<Message> popped = new LinkedList<Message>();
        while (popped.size() != messageCount && (msgId = prefetchedIdQueue.poll()) != null) {
            long added = this.quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
            if (added == 0L) {
                this.logger.warn("cannot add {} to the unack shard {}", (Object)msgId, (Object)unackShardName);
                this.monitor.misses.increment();
                continue;
            }
            long removed = this.quorumConn.zrem(queueShardName, new String[]{msgId});
            if (removed == 0L) {
                this.logger.warn("cannot remove {} from the queue shard {}", (Object)msgId, (Object)queueShardName);
                this.monitor.misses.increment();
                continue;
            }
            String json = this.quorumConn.hget(this.messageStoreKey, msgId);
            if (json == null) {
                this.logger.warn("Cannot get the message payload for {}", (Object)msgId);
                this.monitor.misses.increment();
                continue;
            }
            Message msg = (Message)this.om.readValue(json, Message.class);
            popped.add(msg);
            if (popped.size() != messageCount) continue;
            return popped;
        }
        return popped;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean ack(String messageId) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = QueueUtils.execute("ack", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    if (removed <= 0L) continue;
                    this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public void ack(List<Message> messages) {
        for (Message message : messages) {
            this.ack(message.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean setUnackTimeout(String messageId, long timeout) {
        Stopwatch sw = this.monitor.ack.start();
        try {
            boolean bl = QueueUtils.execute("setUnackTimeout", "(a shard in) " + this.queueName, () -> {
                double unackScore = Long.valueOf(this.clock.millis() + timeout).doubleValue();
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    Double score = this.quorumConn.zscore(unackShardKey, messageId);
                    if (score == null) continue;
                    this.quorumConn.zadd(unackShardKey, unackScore, messageId);
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public boolean setTimeout(String messageId, long timeout) {
        return QueueUtils.execute("setTimeout", "(a shard in) " + this.queueName, () -> {
            String json = this.nonQuorumConn.hget(this.messageStoreKey, messageId);
            if (json == null) {
                return false;
            }
            Message message = (Message)this.om.readValue(json, Message.class);
            message.setTimeout(timeout);
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                Double score = this.quorumConn.zscore(queueShard, messageId);
                if (score == null) continue;
                double priorityd = message.getPriority() / 100;
                double newScore = Long.valueOf(this.clock.millis() + timeout).doubleValue() + priorityd;
                ZAddParams params = ZAddParams.zAddParams().xx();
                this.quorumConn.zadd(queueShard, newScore, messageId, params);
                json = this.om.writeValueAsString((Object)message);
                this.quorumConn.hset(this.messageStoreKey, message.getId(), json);
                return true;
            }
            return false;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        try {
            boolean bl = QueueUtils.execute("remove", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    this.quorumConn.zrem(unackShardKey, new String[]{messageId});
                    String queueShardKey = this.getQueueShardKey(this.queueName, shard);
                    Long removed = this.quorumConn.zrem(queueShardKey, new String[]{messageId});
                    if (removed <= 0L) continue;
                    Long msgRemoved = this.quorumConn.hdel(this.messageStoreKey, new String[]{messageId});
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean atomicRemove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        try {
            boolean bl = QueueUtils.execute("remove", "(a shard in) " + this.queueName, () -> {
                String atomicRemoveScript = "local hkey=KEYS[1]\nlocal msg_id=ARGV[1]\nlocal num_shards=ARGV[2]\n\nlocal removed_shard=0\nlocal removed_unack=0\nlocal removed_hash=0\nfor i=0,num_shards-1 do\n  local shard_name = ARGV[3+(i*2)]\n  local unack_name = ARGV[3+(i*2)+1]\n\n  removed_shard = removed_shard + redis.call('zrem', shard_name, msg_id)\n  removed_unack = removed_unack + redis.call('zrem', unack_name, msg_id)\nend\n\nremoved_hash = redis.call('hdel', hkey, msg_id)\nif (removed_shard==1 or removed_unack==1 or removed_hash==1) then\n  return 1\nend\nreturn removed_unack\n";
                ImmutableList.Builder builder = ImmutableList.builder();
                builder.add((Object)messageId);
                builder.add((Object)Integer.toString(this.allShards.size()));
                for (String shard : this.allShards) {
                    String queueShardKey = this.getQueueShardKey(this.queueName, shard);
                    String unackShardKey = this.getUnackKey(this.queueName, shard);
                    builder.add((Object)queueShardKey);
                    builder.add((Object)unackShardKey);
                }
                Long removed = (Long)((DynoJedisClient)this.quorumConn).eval(atomicRemoveScript, Collections.singletonList(this.messageStoreKey), (List)builder.build());
                if (removed == 1L) {
                    return true;
                }
                return false;
            });
            return bl;
        }
        finally {
            sw.stop();
        }
    }

    public boolean ensure(Message message) {
        return QueueUtils.execute("ensure", "(a shard in) " + this.queueName, () -> {
            String messageId = message.getId();
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                Double score = this.quorumConn.zscore(queueShard, messageId);
                if (score != null) {
                    return false;
                }
                String unackShardKey = this.getUnackKey(this.queueName, shard);
                score = this.quorumConn.zscore(unackShardKey, messageId);
                if (score == null) continue;
                return false;
            }
            this.push(Collections.singletonList(message));
            return true;
        });
    }

    public boolean containsPredicate(String predicate) {
        return this.containsPredicate(predicate, false);
    }

    public String getMsgWithPredicate(String predicate) {
        return this.getMsgWithPredicate(predicate, false);
    }

    public boolean containsPredicate(String predicate, boolean localShardOnly) {
        return QueueUtils.execute("containsPredicate", this.messageStoreKey, () -> this.getMsgWithPredicate(predicate, localShardOnly) != null);
    }

    public String getMsgWithPredicate(String predicate, boolean localShardOnly) {
        return QueueUtils.execute("getMsgWithPredicate", this.messageStoreKey, () -> {
            String predicateCheckAllLuaScript = "local hkey=KEYS[1]\nlocal predicate=ARGV[1]\nlocal cursor=0\nlocal begin=false\nwhile (cursor ~= 0 or begin==false) do\n  local ret = redis.call('hscan', hkey, cursor)\n  local curmsgid\n  for i, content in ipairs(ret[2]) do\n    if (i % 2 ~= 0) then\n      curmsgid = content\n    elseif (string.match(content, predicate)) then\n      return curmsgid\n    end\n  end\n  cursor=tonumber(ret[1])\n  begin=true\nend\nreturn nil";
            String predicateCheckLocalOnlyLuaScript = "local hkey=KEYS[1]\nlocal predicate=ARGV[1]\nlocal shard_name=ARGV[2]\nlocal cursor=0\nlocal begin=false\nwhile (cursor ~= 0 or begin==false) do\n    local ret = redis.call('hscan', hkey, cursor)\nlocal curmsgid\nfor i, content in ipairs(ret[2]) do\n    if (i % 2 ~= 0) then\n        curmsgid = content\nelseif (string.match(content, predicate)) then\nlocal in_local_shard = redis.call('zrank', shard_name, curmsgid)\nif (type(in_local_shard) ~= 'boolean' and in_local_shard >= 0) then\nreturn curmsgid\nend\n        end\nend\n        cursor=tonumber(ret[1])\nbegin=true\nend\nreturn nil";
            String retval = localShardOnly ? (String)((DynoJedisClient)this.nonQuorumConn).eval(predicateCheckLocalOnlyLuaScript, Collections.singletonList(this.messageStoreKey), (List)ImmutableList.of((Object)predicate, (Object)this.localQueueShard)) : (String)((DynoJedisClient)this.nonQuorumConn).eval(predicateCheckAllLuaScript, Collections.singletonList(this.messageStoreKey), Collections.singletonList(predicate));
            return retval;
        });
    }

    private Message popMsgWithPredicateObeyPriority(String predicate, boolean localShardOnly) {
        ArrayList retval;
        String popPredicateObeyPriority = "local hkey=KEYS[1]\nlocal predicate=ARGV[1]\nlocal num_shards=ARGV[2]\nlocal peek_until=tonumber(ARGV[3])\nlocal unack_score=tonumber(ARGV[4])\n\nlocal shard_names={}\nlocal unack_names={}\nlocal shard_lengths={}\nlocal largest_shard=-1\nfor i=0,num_shards-1 do\n  shard_names[i+1]=ARGV[5+(i*2)]\n  shard_lengths[i+1] = redis.call('zcard', shard_names[i+1])\n  unack_names[i+1]=ARGV[5+(i*2)+1]\n\n  if (shard_lengths[i+1] > largest_shard) then\n    largest_shard = shard_lengths[i+1]\n  end\nend\n\nlocal min_score=-1\nlocal min_member\nlocal matching_value\nlocal owning_shard_idx=-1\n\nlocal num_complete_shards=0\nfor j=0,largest_shard-1 do\n  for i=1,num_shards do\n    local skiploop=false\n    if (shard_lengths[i] < j+1) then\n      skiploop=true\n    end\n\n    if (skiploop == false) then\n      local element = redis.call('zrange', shard_names[i], j, j, 'WITHSCORES')\n      if ((min_score ~= -1 and min_score < tonumber(element[2])) or peek_until < tonumber(element[2])) then\n        -- This is to make sure we don't process this shard again\n        -- since all elements henceforth are of lower priority than min_member\n        shard_lengths[i]=0\n        num_complete_shards = num_complete_shards + 1\n      else\n        local value = redis.call('hget', hkey, tostring(element[1]))\n        if (value) then\n          if (string.match(value, predicate)) then\n            if (min_score == -1 or tonumber(element[2]) < min_score) then\n              min_score = tonumber(element[2])\n              owning_shard_idx=i\n              min_member = element[1]\n              matching_value = value\n            end\n          end\n        end\n      end\n    end\n  end\n  if (num_complete_shards == num_shards) then\n    break\n  end\nend\n\nif (min_member) then\n  local queue_shard_name=shard_names[owning_shard_idx]\n  local unack_shard_name=unack_names[owning_shard_idx]\n  local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, min_member)\n  if (zadd_ret) then\n    redis.call('zrem', queue_shard_name, min_member)\n  end\nend\nreturn {min_member, matching_value}";
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        NumberFormat fmt = NumberFormat.getIntegerInstance();
        fmt.setGroupingUsed(false);
        String nowScoreString = fmt.format(now);
        String unackScoreString = fmt.format(unackScore);
        if (localShardOnly) {
            String unackShardName = this.getUnackKey(this.queueName, this.shardName);
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add((Object)predicate);
            builder.add((Object)Integer.toString(1));
            builder.add((Object)nowScoreString);
            builder.add((Object)unackScoreString);
            builder.add((Object)this.localQueueShard);
            builder.add((Object)unackShardName);
            retval = (ArrayList)((DynoJedisClient)this.quorumConn).eval(popPredicateObeyPriority, Collections.singletonList(this.messageStoreKey), (List)builder.build());
        } else {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add((Object)predicate);
            builder.add((Object)Integer.toString(this.allShards.size()));
            builder.add((Object)nowScoreString);
            builder.add((Object)unackScoreString);
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShardName = this.getUnackKey(this.queueName, shard);
                builder.add((Object)queueShard);
                builder.add((Object)unackShardName);
            }
            retval = (ArrayList)((DynoJedisClient)this.quorumConn).eval(popPredicateObeyPriority, Collections.singletonList(this.messageStoreKey), (List)builder.build());
        }
        if (retval.size() == 0) {
            return null;
        }
        return new Message((String)retval.get(0), (String)retval.get(1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message popMsgWithPredicate(String predicate, boolean localShardOnly) {
        Stopwatch sw = this.monitor.start(this.monitor.pop, 1);
        try {
            Message payload;
            Message message = payload = QueueUtils.execute("popMsgWithPredicateObeyPriority", this.messageStoreKey, () -> this.popMsgWithPredicateObeyPriority(predicate, localShardOnly));
            return message;
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.numIdsToPrefetch.addAndGet(messageCount);
            this.prefetchIds();
            while (this.prefetchedIds.size() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIds();
            }
            int numToPop = this.prefetchedIds.size() > messageCount ? messageCount : this.prefetchedIds.size();
            List<Message> list = this.atomicBulkPopHelper(numToPop, this.prefetchedIds, true);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        try {
            long start = this.clock.millis();
            long waitFor = unit.toMillis(wait);
            this.unsafeNumIdsToPrefetchAllShards.addAndGet(messageCount);
            this.prefetchIdsAllShards();
            while (this.unsafeGetNumPrefetchedIds() < messageCount && this.clock.millis() - start < waitFor) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                this.prefetchIdsAllShards();
            }
            int numToPop = this.unsafeGetNumPrefetchedIds() > messageCount ? messageCount : this.unsafeGetNumPrefetchedIds();
            ConcurrentLinkedQueue<String> messageIds = new ConcurrentLinkedQueue<String>();
            int numPrefetched = 0;
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                int prefetchedIdsSize = this.unsafePrefetchedIdsAllShardsMap.get(queueShardName).size();
                for (int i = 0; i < prefetchedIdsSize; ++i) {
                    messageIds.add(this.unsafePrefetchedIdsAllShardsMap.get(queueShardName).poll());
                    if (++numPrefetched == numToPop) break;
                }
                if (numPrefetched != numToPop) continue;
                break;
            }
            List<Message> list = this.atomicBulkPopHelper(numToPop, messageIds, false);
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    private List<Message> atomicBulkPopHelper(int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException {
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        NumberFormat fmt = NumberFormat.getIntegerInstance();
        fmt.setGroupingUsed(false);
        String nowScoreString = fmt.format(now);
        String unackScoreString = fmt.format(unackScore);
        ArrayList<String> messageIds = new ArrayList<String>();
        for (int i = 0; i < messageCount; ++i) {
            messageIds.add(prefetchedIdQueue.poll());
        }
        String atomicBulkPopScriptLocalOnly = "local hkey=KEYS[1]\nlocal num_msgs=ARGV[1]\nlocal peek_until=ARGV[2]\nlocal unack_score=ARGV[3]\nlocal queue_shard_name=ARGV[4]\nlocal unack_shard_name=ARGV[5]\nlocal msg_start_idx = 6\nlocal idx = 1\nlocal return_vals={}\nfor i=0,num_msgs-1 do\n  local message_id=ARGV[msg_start_idx + i]\n  local exists = redis.call('zscore', queue_shard_name, message_id)\n  if (exists) then\n    if (exists <=peek_until) then\n      local value = redis.call('hget', hkey, message_id)\n      if (value) then\n        local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n        if (zadd_ret) then\n          redis.call('zrem', queue_shard_name, message_id)\n          return_vals[idx]=value\n          idx=idx+1\n        end\n      end\n    end\n  else\n    return {}\n  end\nend\nreturn return_vals";
        String atomicBulkPopScript = "local hkey=KEYS[1]\nlocal num_msgs=ARGV[1]\nlocal num_shards=ARGV[2]\nlocal peek_until=ARGV[3]\nlocal unack_score=ARGV[4]\nlocal shard_start_idx = 5\nlocal msg_start_idx = 5 + (num_shards * 2)\nlocal out_idx = 1\nlocal return_vals={}\nfor i=0,num_msgs-1 do\n  local found_msg=false\n  local message_id=ARGV[msg_start_idx + i]\n  for j=0,num_shards-1 do\n    local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n    local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n    local exists = redis.call('zscore', queue_shard_name, message_id)\n    if (exists) then\n      found_msg=true\n      if (exists <=peek_until) then\n        local value = redis.call('hget', hkey, message_id)\n        if (value) then\n          local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n          if (zadd_ret) then\n            redis.call('zrem', queue_shard_name, message_id)\n            return_vals[out_idx]=value\n            out_idx=out_idx+1\n            break\n          end\n        end\n      end\n    end\n  end\n  if (found_msg == false) then\n    return {}\n  end\nend\nreturn return_vals";
        ArrayList<Message> payloads = new ArrayList<Message>();
        if (localShardOnly) {
            String unackShardName = this.getUnackKey(this.queueName, this.shardName);
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add((Object)Integer.toString(messageCount));
            builder.add((Object)nowScoreString);
            builder.add((Object)unackScoreString);
            builder.add((Object)this.localQueueShard);
            builder.add((Object)unackShardName);
            for (int i = 0; i < messageCount; ++i) {
                builder.add(messageIds.get(i));
            }
            List jsonPayloads = (List)((DynoJedisClient)this.quorumConn).eval(atomicBulkPopScriptLocalOnly, Collections.singletonList(this.messageStoreKey), (List)builder.build());
            for (String p : jsonPayloads) {
                Message msg = (Message)this.om.readValue(p, Message.class);
                payloads.add(msg);
            }
        } else {
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add((Object)Integer.toString(messageCount));
            builder.add((Object)Integer.toString(this.allShards.size()));
            builder.add((Object)nowScoreString);
            builder.add((Object)unackScoreString);
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShardName = this.getUnackKey(this.queueName, shard);
                builder.add((Object)queueShard);
                builder.add((Object)unackShardName);
            }
            for (int i = 0; i < messageCount; ++i) {
                builder.add(messageIds.get(i));
            }
            List jsonPayloads = (List)((DynoJedisClient)this.quorumConn).eval(atomicBulkPopScript, Collections.singletonList(this.messageStoreKey), (List)builder.build());
            for (String p : jsonPayloads) {
                Message msg = (Message)this.om.readValue(p, Message.class);
                payloads.add(msg);
            }
        }
        return payloads;
    }

    private String atomicPopWithMsgIdHelper(String messageId, boolean localShardOnly) {
        String retval;
        double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        NumberFormat fmt = NumberFormat.getIntegerInstance();
        fmt.setGroupingUsed(false);
        String nowScoreString = fmt.format(now);
        String unackScoreString = fmt.format(unackScore);
        String atomicPopScript = "local hkey=KEYS[1]\nlocal message_id=ARGV[1]\nlocal num_shards=ARGV[2]\nlocal peek_until=ARGV[3]\nlocal unack_score=ARGV[4]\nfor i=0,num_shards-1 do\n  local queue_shard_name=ARGV[(i*2)+5]\n  local unack_shard_name=ARGV[(i*2)+5+1]\n  local exists = redis.call('zscore', queue_shard_name, message_id)\n  if (exists) then\n    if (exists <= peek_until) then\n      local value = redis.call('hget', hkey, message_id)\n      if (value) then\n        local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id )\n        if (zadd_ret) then\n          redis.call('zrem', queue_shard_name, message_id)\n          return value\n        end\n      end\n    end\n  end\nend\nreturn nil";
        if (localShardOnly) {
            String unackShardName = this.getUnackKey(this.queueName, this.shardName);
            retval = (String)((DynoJedisClient)this.quorumConn).eval(atomicPopScript, Collections.singletonList(this.messageStoreKey), (List)ImmutableList.of((Object)messageId, (Object)Integer.toString(1), (Object)nowScoreString, (Object)unackScoreString, (Object)this.localQueueShard, (Object)unackShardName));
        } else {
            int numShards = this.allShards.size();
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add((Object)messageId);
            builder.add((Object)Integer.toString(numShards));
            builder.add((Object)nowScoreString);
            builder.add((Object)unackScoreString);
            List<String> arguments = Arrays.asList(messageId, Integer.toString(numShards), nowScoreString, unackScoreString);
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShardName = this.getUnackKey(this.queueName, shard);
                builder.add((Object)queueShard);
                builder.add((Object)unackShardName);
            }
            retval = (String)((DynoJedisClient)this.quorumConn).eval(atomicPopScript, Collections.singletonList(this.messageStoreKey), (List)builder.build());
        }
        return retval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message get(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        try {
            Message message = QueueUtils.execute("get", this.messageStoreKey, () -> {
                String json = this.quorumConn.hget(this.messageStoreKey, messageId);
                if (json == null) {
                    this.logger.warn("Cannot get the message payload " + messageId);
                    return null;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                return msg;
            });
            return message;
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> getAllMessages() {
        Map allMsgs = this.nonQuorumConn.hgetAll(this.messageStoreKey);
        ArrayList<Message> retList = new ArrayList<Message>();
        for (Map.Entry entry : allMsgs.entrySet()) {
            Message msg = new Message((String)entry.getKey(), (String)entry.getValue());
            retList.add(msg);
        }
        return retList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message localGet(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        try {
            Message message = QueueUtils.execute("localGet", this.messageStoreKey, () -> {
                String json = this.nonQuorumConn.hget(this.messageStoreKey, messageId);
                if (json == null) {
                    this.logger.warn("Cannot get the message payload " + messageId);
                    return null;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                return msg;
            });
            return message;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long size() {
        Stopwatch sw = this.monitor.size.start();
        try {
            long l = QueueUtils.execute("size", "(a shard in) " + this.queueName, () -> {
                long size = 0L;
                for (String shard : this.allShards) {
                    size += this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard)).longValue();
                }
                return size;
            });
            return l;
        }
        finally {
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch sw = this.monitor.size.start();
        HashMap shardSizes = new HashMap();
        try {
            Map map = QueueUtils.execute("shardSizes", "(a shard in) " + this.queueName, () -> {
                for (String shard : this.allShards) {
                    long size = this.nonQuorumConn.zcard(this.getQueueShardKey(this.queueName, shard));
                    long uacked = this.nonQuorumConn.zcard(this.getUnackKey(this.queueName, shard));
                    HashMap<String, Long> shardDetails = new HashMap<String, Long>();
                    shardDetails.put("size", size);
                    shardDetails.put("uacked", uacked);
                    shardSizes.put(shard, shardDetails);
                }
                return shardSizes;
            });
            return map;
        }
        finally {
            sw.stop();
        }
    }

    public void clear() {
        QueueUtils.execute("clear", "(a shard in) " + this.queueName, () -> {
            for (String shard : this.allShards) {
                String queueShard = this.getQueueShardKey(this.queueName, shard);
                String unackShard = this.getUnackKey(this.queueName, shard);
                this.quorumConn.del(queueShard);
                this.quorumConn.del(unackShard);
            }
            this.quorumConn.del(this.messageStoreKey);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processUnacks() {
        this.logger.info("processUnacks() will NOT be atomic.");
        Stopwatch sw = this.monitor.processUnack.start();
        try {
            long queueDepth = this.size();
            this.monitor.queueDepth.record(queueDepth);
            String keyName = this.getUnackKey(this.queueName, this.shardName);
            QueueUtils.execute("processUnacks", keyName, () -> {
                int batchSize = 1000;
                String unackShardName = this.getUnackKey(this.queueName, this.shardName);
                double now = Long.valueOf(this.clock.millis()).doubleValue();
                int num_moved_back = 0;
                int num_stale = 0;
                Set unacks = this.nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0.0, now, 0, batchSize);
                if (unacks.size() > 0) {
                    this.logger.info("processUnacks: Attempting to add " + unacks.size() + " messages back to shard of queue: " + unackShardName);
                }
                for (Tuple unack : unacks) {
                    double score = unack.getScore();
                    String member = unack.getElement();
                    String payload = this.quorumConn.hget(this.messageStoreKey, member);
                    if (payload == null) {
                        this.quorumConn.zrem(unackShardName, new String[]{member});
                        ++num_stale;
                        continue;
                    }
                    long added_back = this.quorumConn.zadd(this.localQueueShard, score, member);
                    long removed_from_unack = this.quorumConn.zrem(unackShardName, new String[]{member});
                    if (added_back <= 0L || removed_from_unack <= 0L) continue;
                    ++num_moved_back;
                }
                if (num_moved_back > 0 || num_stale > 0) {
                    this.logger.info("processUnacks: Moved back " + num_moved_back + " items. Got rid of " + num_stale + " stale items.");
                }
                return null;
            });
        }
        catch (Exception e) {
            this.logger.error("Error while processing unacks. " + e.getMessage());
        }
        finally {
            sw.stop();
        }
    }

    public List<Message> findStaleMessages() {
        return QueueUtils.execute("findStaleMessages", this.localQueueShard, () -> {
            ArrayList<Message> stale_msgs = new ArrayList<Message>();
            int batchSize = 10;
            double now = Long.valueOf(this.clock.millis()).doubleValue();
            long num_stale = 0L;
            for (String shard : this.allShards) {
                String queueShardName = this.getQueueShardKey(this.queueName, shard);
                Set elems = this.nonQuorumConn.zrangeByScore(queueShardName, 0.0, now, 0, batchSize);
                if (elems.size() == 0) continue;
                String findStaleMsgsScript = "local hkey=KEYS[1]\nlocal queue_shard=ARGV[1]\nlocal unack_shard=ARGV[2]\nlocal num_msgs=ARGV[3]\n\nlocal stale_msgs={}\nlocal num_stale_idx = 1\nfor i=0,num_msgs-1 do\n  local msg_id=ARGV[4+i]\n\n  local exists_hash = redis.call('hget', hkey, msg_id)\n  local exists_queue = redis.call('zscore', queue_shard, msg_id)\n  local exists_unack = redis.call('zscore', unack_shard, msg_id)\n\n  if (exists_hash and exists_queue) then\n  elseif (not (exists_unack)) then\n    stale_msgs[num_stale_idx] = msg_id\n    num_stale_idx = num_stale_idx + 1\n  end\nend\n\nreturn stale_msgs\n";
                String unackKey = this.getUnackKey(this.queueName, shard);
                ImmutableList.Builder builder = ImmutableList.builder();
                builder.add((Object)queueShardName);
                builder.add((Object)unackKey);
                builder.add((Object)Integer.toString(elems.size()));
                for (String msg : elems) {
                    builder.add((Object)msg);
                }
                ArrayList stale_msg_ids = (ArrayList)((DynoJedisClient)this.quorumConn).eval(findStaleMsgsScript, Collections.singletonList(this.messageStoreKey), (List)builder.build());
                num_stale = stale_msg_ids.size();
                if (num_stale > 0L) {
                    this.logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap");
                }
                for (String m : stale_msg_ids) {
                    Message msg = new Message();
                    msg.setId(m);
                    stale_msgs.add(msg);
                }
            }
            return stale_msgs;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void atomicProcessUnacks() {
        this.logger.info("processUnacks() will be atomic.");
        Stopwatch sw = this.monitor.processUnack.start();
        try {
            long queueDepth = this.size();
            this.monitor.queueDepth.record(queueDepth);
            String keyName = this.getUnackKey(this.queueName, this.shardName);
            QueueUtils.execute("processUnacks", keyName, () -> {
                int batchSize = 1000;
                String unackShardName = this.getUnackKey(this.queueName, this.shardName);
                double now = Long.valueOf(this.clock.millis()).doubleValue();
                long num_moved_back = 0L;
                long num_stale = 0L;
                Set unacks = this.nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0.0, now, 0, batchSize);
                if (unacks.size() <= 0) {
                    return null;
                }
                this.logger.info("processUnacks: Attempting to add " + unacks.size() + " messages back to shard of queue: " + unackShardName);
                String atomicProcessUnacksScript = "local hkey=KEYS[1]\nlocal unack_shard=ARGV[1]\nlocal queue_shard=ARGV[2]\nlocal num_unacks=ARGV[3]\n\nlocal unacks={}\nlocal unack_scores={}\nlocal unack_start_idx = 4\nfor i=0,num_unacks-1 do\n  unacks[i]=ARGV[4 + (i*2)]\n  unack_scores[i]=ARGV[4+(i*2)+1]\nend\n\nlocal num_moved=0\nlocal num_stale=0\nfor i=0,num_unacks-1 do\n  local mem_val = redis.call('hget', hkey, unacks[i])\n  if (mem_val) then\n    redis.call('zadd', queue_shard, unack_scores[i], unacks[i])\n    redis.call('zrem', unack_shard, unacks[i])\n    num_moved=num_moved+1\n  else\n    redis.call('zrem', unack_shard, unacks[i])\n    num_stale=num_stale+1\n  end\nend\n\nreturn {num_moved, num_stale}\n";
                ImmutableList.Builder builder = ImmutableList.builder();
                builder.add((Object)unackShardName);
                builder.add((Object)this.localQueueShard);
                builder.add((Object)Integer.toString(unacks.size()));
                for (Tuple unack : unacks) {
                    builder.add((Object)unack.getElement());
                    NumberFormat fmt = NumberFormat.getIntegerInstance();
                    fmt.setGroupingUsed(false);
                    String unackScoreString = fmt.format(unack.getScore());
                    builder.add((Object)unackScoreString);
                }
                ArrayList retval = (ArrayList)((DynoJedisClient)this.quorumConn).eval(atomicProcessUnacksScript, Collections.singletonList(this.messageStoreKey), (List)builder.build());
                num_moved_back = (Long)retval.get(0);
                num_stale = (Long)retval.get(1);
                if (num_moved_back > 0L || num_stale > 0L) {
                    this.logger.info("processUnacks: Moved back " + num_moved_back + " items. Got rid of " + num_stale + " stale items.");
                }
                return null;
            });
        }
        catch (Exception e) {
            this.logger.error("Error while processing unacks. " + e.getMessage());
        }
        finally {
            sw.stop();
        }
    }

    private String getQueueShardKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".QUEUE." + queueName + "." + shard;
    }

    private String getUnackKey(String queueName, String shard) {
        return this.redisKeyPrefix + ".UNACK." + queueName + "." + shard;
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.monitor.close();
    }
}

