/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.queues.redis.v2;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.impl.hash.Murmur3HashPartitioner;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.dyno.queues.redis.QueueMonitor;
import com.netflix.dyno.queues.redis.QueueUtils;
import com.netflix.dyno.queues.redis.conn.Pipe;
import com.netflix.dyno.queues.redis.conn.RedisConnection;
import com.netflix.servo.monitor.Stopwatch;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Response;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.ZAddParams;

public class RedisPipelineQueue
implements DynoQueue {
    private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class);
    private final Clock clock;
    private final String queueName;
    private final String shardName;
    private final String messageStoreKeyPrefix;
    private final String myQueueShard;
    private final String unackShardKeyPrefix;
    private final int unackTime;
    private final QueueMonitor monitor;
    private final ObjectMapper om;
    private final RedisConnection connPool;
    private volatile RedisConnection nonQuorumPool;
    private final ScheduledExecutorService schedulerForUnacksProcessing;
    private final HashPartitioner partitioner = new Murmur3HashPartitioner();
    private final int maxHashBuckets = 32;
    private final int longPollWaitIntervalInMillis = 10;

    public RedisPipelineQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, RedisConnection pool) {
        this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, shardName, unackScheduleInMS, unackTime, pool);
    }

    public RedisPipelineQueue(Clock clock, String redisKeyPrefix, String queue, String shardName, int unackScheduleInMS, int unackTime, RedisConnection pool) {
        this.clock = clock;
        this.queueName = queue;
        String qName = "{" + queue + "." + shardName + "}";
        this.shardName = shardName;
        this.messageStoreKeyPrefix = redisKeyPrefix + ".MSG." + qName;
        this.myQueueShard = redisKeyPrefix + ".QUEUE." + qName;
        this.unackShardKeyPrefix = redisKeyPrefix + ".UNACK." + qName + ".";
        this.unackTime = unackTime;
        this.connPool = pool;
        this.nonQuorumPool = pool;
        this.om = QueueUtils.constructObjectMapper();
        this.monitor = new QueueMonitor(qName, shardName);
        this.schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
        this.schedulerForUnacksProcessing.scheduleAtFixedRate(() -> this.processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
        this.logger.info(RedisPipelineQueue.class.getName() + " is ready to serve " + qName + ", shard=" + shardName);
    }

    public void setNonQuorumPool(RedisConnection nonQuorumPool) {
        this.nonQuorumPool = nonQuorumPool;
    }

    public String getName() {
        return this.queueName;
    }

    public int getUnackTime() {
        return this.unackTime;
    }

    public List<String> push(List<Message> messages) {
        Stopwatch sw = this.monitor.start(this.monitor.push, messages.size());
        RedisConnection conn = this.connPool.getResource();
        try {
            Pipe pipe = conn.pipelined();
            for (Message message : messages) {
                String json = this.om.writeValueAsString((Object)message);
                pipe.hset(this.messageStoreKey(message.getId()), message.getId(), json);
                double priority = (double)message.getPriority() / 100.0;
                double score = Long.valueOf(this.clock.millis() + message.getTimeout()).doubleValue() + priority;
                pipe.zadd(this.myQueueShard, score, message.getId());
            }
            pipe.sync();
            pipe.close();
            List list = messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
            return list;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            conn.close();
            sw.stop();
        }
    }

    private String messageStoreKey(String msgId) {
        Long hash = this.partitioner.hash(msgId);
        long bucket = hash % 32L;
        return this.messageStoreKeyPrefix + "." + bucket;
    }

    private String unackShardKey(String messageId) {
        Long hash = this.partitioner.hash(messageId);
        long bucket = hash % 32L;
        return this.unackShardKeyPrefix + bucket;
    }

    public List<Message> peek(int messageCount) {
        Stopwatch sw = this.monitor.peek.start();
        RedisConnection jedis = this.connPool.getResource();
        try {
            Set<String> ids = this.peekIds(0, messageCount);
            if (ids == null) {
                List<Message> list = Collections.emptyList();
                return list;
            }
            LinkedList<Message> messages = new LinkedList<Message>();
            for (String id : ids) {
                String json = jedis.hget(this.messageStoreKey(id), id);
                Message message = (Message)this.om.readValue(json, Message.class);
                messages.add(message);
            }
            LinkedList<Message> linkedList = messages;
            return linkedList;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public synchronized List<Message> pop(int messageCount, int wait, TimeUnit unit) {
        if (messageCount < 1) {
            return Collections.emptyList();
        }
        Stopwatch sw = this.monitor.start(this.monitor.pop, messageCount);
        List<Message> messages = new LinkedList<Message>();
        int remaining = messageCount;
        long time = this.clock.millis() + unit.toMillis(wait);
        try {
            List<String> peeked;
            do {
                List<Message> popped;
                int poppedCount;
                if ((poppedCount = (popped = this._pop(peeked = this.peekIds(0, remaining).stream().collect(Collectors.toList()))).size()) == messageCount) {
                    messages = popped;
                    break;
                }
                messages.addAll(popped);
                remaining -= poppedCount;
                if (this.clock.millis() > time) break;
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException ie) {
                    this.logger.error(ie.getMessage(), (Throwable)ie);
                }
            } while (remaining > 0);
            peeked = messages;
            return peeked;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            sw.stop();
        }
    }

    public Message popWithMsgId(String messageId) {
        throw new UnsupportedOperationException();
    }

    public Message unsafePopWithMsgIdAllShards(String messageId) {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Message> _pop(List<String> batch) throws Exception {
        double unackScore = Long.valueOf(this.clock.millis() + (long)this.unackTime).doubleValue();
        LinkedList<Message> popped = new LinkedList<Message>();
        ZAddParams zParams = ZAddParams.zAddParams().nx();
        try (RedisConnection jedis = this.connPool.getResource();){
            int i;
            String msgId;
            Pipe pipe = jedis.pipelined();
            ArrayList<Response<Long>> zadds = new ArrayList<Response<Long>>(batch.size());
            for (int i2 = 0; i2 < batch.size() && (msgId = batch.get(i2)) != null; ++i2) {
                zadds.add(pipe.zadd(this.unackShardKey(msgId), unackScore, msgId, zParams));
            }
            pipe.sync();
            pipe = jedis.pipelined();
            int count = zadds.size();
            ArrayList<String> zremIds = new ArrayList<String>(count);
            LinkedList<Response<Long>> zremRes = new LinkedList<Response<Long>>();
            for (int i3 = 0; i3 < count; ++i3) {
                long added = (Long)((Response)zadds.get(i3)).get();
                if (added == 0L) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot add {} to unack queue shard", (Object)batch.get(i3));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                String id = batch.get(i3);
                zremIds.add(id);
                zremRes.add(pipe.zrem(this.myQueueShard, id));
            }
            pipe.sync();
            pipe = jedis.pipelined();
            ArrayList<Response<String>> getRes = new ArrayList<Response<String>>(count);
            for (i = 0; i < zremRes.size(); ++i) {
                long removed = (Long)((Response)zremRes.get(i)).get();
                if (removed == 0L) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot remove {} from queue shard", zremIds.get(i));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                getRes.add(pipe.hget(this.messageStoreKey((String)zremIds.get(i)), (String)zremIds.get(i)));
            }
            pipe.sync();
            for (i = 0; i < getRes.size(); ++i) {
                String json = (String)((Response)getRes.get(i)).get();
                if (json == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cannot read payload for {}", zremIds.get(i));
                    }
                    this.monitor.misses.increment();
                    continue;
                }
                Message msg = (Message)this.om.readValue(json, Message.class);
                msg.setShard(this.shardName);
                popped.add(msg);
            }
            LinkedList<Message> linkedList = popped;
            return linkedList;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean ack(String messageId) {
        Stopwatch sw = this.monitor.ack.start();
        RedisConnection jedis = this.connPool.getResource();
        try {
            Long removed = jedis.zrem(this.unackShardKey(messageId), messageId);
            if (removed > 0L) {
                jedis.hdel(this.messageStoreKey(messageId), messageId);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public void ack(List<Message> messages) {
        Stopwatch sw = this.monitor.ack.start();
        RedisConnection jedis = this.connPool.getResource();
        Pipe pipe = jedis.pipelined();
        LinkedList<Response<Long>> responses = new LinkedList<Response<Long>>();
        try {
            for (Message msg : messages) {
                responses.add(pipe.zrem(this.unackShardKey(msg.getId()), msg.getId()));
            }
            pipe.sync();
            pipe = jedis.pipelined();
            LinkedList<Response<Long>> dels = new LinkedList<Response<Long>>();
            for (int i = 0; i < messages.size(); ++i) {
                Long removed = (Long)((Response)responses.get(i)).get();
                if (removed <= 0L) continue;
                dels.add(pipe.hdel(this.messageStoreKey(messages.get(i).getId()), messages.get(i).getId()));
            }
            pipe.sync();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean setUnackTimeout(String messageId, long timeout) {
        Stopwatch sw = this.monitor.ack.start();
        RedisConnection jedis = this.connPool.getResource();
        try {
            double unackScore = Long.valueOf(this.clock.millis() + timeout).doubleValue();
            Double score = jedis.zscore(this.unackShardKey(messageId), messageId);
            if (score != null) {
                jedis.zadd(this.unackShardKey(messageId), unackScore, messageId);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public boolean setTimeout(String messageId, long timeout) {
        try (RedisConnection jedis = this.connPool.getResource();){
            String json = jedis.hget(this.messageStoreKey(messageId), messageId);
            if (json == null) {
                boolean bl = false;
                return bl;
            }
            Message message = (Message)this.om.readValue(json, Message.class);
            message.setTimeout(timeout);
            Double score = jedis.zscore(this.myQueueShard, messageId);
            if (score != null) {
                double priorityd = (double)message.getPriority() / 100.0;
                double newScore = Long.valueOf(this.clock.millis() + timeout).doubleValue() + priorityd;
                jedis.zadd(this.myQueueShard, newScore, messageId);
                json = this.om.writeValueAsString((Object)message);
                jedis.hset(this.messageStoreKey(message.getId()), message.getId(), json);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(String messageId) {
        Stopwatch sw = this.monitor.remove.start();
        RedisConnection jedis = this.connPool.getResource();
        try {
            jedis.zrem(this.unackShardKey(messageId), messageId);
            Long removed = jedis.zrem(this.myQueueShard, messageId);
            Long msgRemoved = jedis.hdel(this.messageStoreKey(messageId), messageId);
            if (removed > 0L && msgRemoved > 0L) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public boolean ensure(Message message) {
        throw new UnsupportedOperationException();
    }

    public boolean containsPredicate(String predicate) {
        return this.containsPredicate(predicate, false);
    }

    public String getMsgWithPredicate(String predicate) {
        return this.getMsgWithPredicate(predicate, false);
    }

    public boolean containsPredicate(String predicate, boolean localShardOnly) {
        throw new UnsupportedOperationException();
    }

    public String getMsgWithPredicate(String predicate, boolean localShardOnly) {
        throw new UnsupportedOperationException();
    }

    public Message popMsgWithPredicate(String predicate, boolean localShardOnly) {
        throw new UnsupportedOperationException();
    }

    public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    public Message get(String messageId) {
        Stopwatch sw = this.monitor.get.start();
        RedisConnection jedis = this.connPool.getResource();
        try {
            Message msg;
            String json = jedis.hget(this.messageStoreKey(messageId), messageId);
            if (json == null) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cannot get the message payload " + messageId);
                }
                Message message = null;
                return message;
            }
            Message message = msg = (Message)this.om.readValue(json, Message.class);
            return message;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    public Message localGet(String messageId) {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long size() {
        Stopwatch sw = this.monitor.size.start();
        RedisConnection jedis = this.nonQuorumPool.getResource();
        try {
            long size;
            long l = size = jedis.zcard(this.myQueueShard);
            return l;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Map<String, Long>> shardSizes() {
        Stopwatch sw = this.monitor.size.start();
        HashMap<String, Map<String, Long>> shardSizes = new HashMap<String, Map<String, Long>>();
        RedisConnection jedis = this.nonQuorumPool.getResource();
        try {
            long size = jedis.zcard(this.myQueueShard);
            long uacked = 0L;
            for (int i = 0; i < 32; ++i) {
                String unackShardKey = this.unackShardKeyPrefix + i;
                uacked += jedis.zcard(unackShardKey);
            }
            HashMap<String, Long> shardDetails = new HashMap<String, Long>();
            shardDetails.put("size", size);
            shardDetails.put("uacked", uacked);
            shardSizes.put(this.shardName, shardDetails);
            HashMap<String, Map<String, Long>> hashMap = shardSizes;
            return hashMap;
        }
        finally {
            jedis.close();
            sw.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear() {
        try (RedisConnection jedis = this.connPool.getResource();){
            jedis.del(this.myQueueShard);
            for (int bucket = 0; bucket < 32; ++bucket) {
                String unackShardKey = this.unackShardKeyPrefix + bucket;
                jedis.del(unackShardKey);
                String messageStoreKey = this.messageStoreKeyPrefix + "." + bucket;
                jedis.del(messageStoreKey);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<String> peekIds(int offset, int count) {
        try (RedisConnection jedis = this.connPool.getResource();){
            Set<String> scanned;
            double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
            Set<String> set = scanned = jedis.zrangeByScore(this.myQueueShard, 0, now, offset, count);
            return set;
        }
    }

    public void processUnacks() {
        for (int i = 0; i < 32; ++i) {
            String unackShardKey = this.unackShardKeyPrefix + i;
            this.processUnacks(unackShardKey);
        }
    }

    private void processUnacks(String unackShardKey) {
        Stopwatch sw = this.monitor.processUnack.start();
        RedisConnection jedis2 = this.connPool.getResource();
        try {
            while (true) {
                long queueDepth = this.size();
                this.monitor.queueDepth.record(queueDepth);
                int batchSize = 1000;
                double now = Long.valueOf(this.clock.millis()).doubleValue();
                Set<Tuple> unacks = jedis2.zrangeByScoreWithScores(unackShardKey, 0, now, 0, batchSize);
                if (unacks.size() <= 0) {
                    return;
                }
                this.logger.debug("Adding " + unacks.size() + " messages back to the queue for " + this.queueName);
                LinkedList<Tuple> requeue = new LinkedList<Tuple>();
                for (Tuple unack : unacks) {
                    double score = unack.getScore();
                    String member = unack.getElement();
                    String payload = jedis2.hget(this.messageStoreKey(member), member);
                    if (payload == null) {
                        jedis2.zrem(this.unackShardKey(member), member);
                        continue;
                    }
                    requeue.add(unack);
                }
                Pipe pipe = jedis2.pipelined();
                for (Tuple unack : requeue) {
                    double score = unack.getScore();
                    String member = unack.getElement();
                    pipe.zadd(this.myQueueShard, score, member);
                    pipe.zrem(this.unackShardKey(member), member);
                }
                pipe.sync();
            }
        }
        finally {
            jedis2.close();
            sw.stop();
        }
    }

    public List<Message> getAllMessages() {
        throw new UnsupportedOperationException();
    }

    public void atomicProcessUnacks() {
        throw new UnsupportedOperationException();
    }

    public List<Message> findStaleMessages() {
        throw new UnsupportedOperationException();
    }

    public boolean atomicRemove(String messageId) {
        throw new UnsupportedOperationException();
    }

    public void close() throws IOException {
        this.schedulerForUnacksProcessing.shutdown();
        this.monitor.close();
    }

    public List<Message> unsafePeekAllShards(int messageCount) {
        throw new UnsupportedOperationException();
    }

    public List<Message> unsafePopAllShards(int messageCount, int wait, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }
}

