/*
 * Decompiled with CFR 0.152.
 */
package io.orkes.conductor.mq.redis;

import com.google.common.util.concurrent.Uninterruptibles;
import io.orkes.conductor.mq.QueueMessage;
import java.math.BigDecimal;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class QueueMonitor {
    private static final Logger log = LoggerFactory.getLogger(QueueMonitor.class);
    private static final BigDecimal HUNDRED = new BigDecimal(100);
    private final Clock clock;
    private final LinkedBlockingQueue<QueueMessage> peekedMessages;
    private final String queueName;
    private int queueUnackTime = 30000;
    private long size = 0L;
    private int maxPollCount = 100;

    public void setMaxPollCount(int maxPollCount) {
        this.maxPollCount = maxPollCount;
    }

    public int getMaxPollCount() {
        return this.maxPollCount;
    }

    public QueueMonitor(String queueName) {
        this.queueName = queueName;
        this.clock = Clock.systemDefaultZone();
        this.peekedMessages = new LinkedBlockingQueue();
    }

    public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
        if (count <= 0) {
            log.warn("Negative poll count {}", (Object)count);
            return new ArrayList<QueueMessage>();
        }
        ArrayList<QueueMessage> messages = new ArrayList<QueueMessage>();
        if (count > this.maxPollCount) {
            count = this.maxPollCount;
        }
        this.__peekedMessages(count);
        long now = this.clock.millis();
        boolean waited = false;
        for (int i = 0; i < count; ++i) {
            try {
                QueueMessage message = this.peekedMessages.poll();
                if (message == null) {
                    if (!waited && waitTime > 0) {
                        Uninterruptibles.sleepUninterruptibly((long)waitTime, (TimeUnit)timeUnit);
                        waited = true;
                        continue;
                    }
                    return messages;
                }
                if (now > message.getExpiry()) {
                    this.peekedMessages.clear();
                    return new ArrayList<QueueMessage>();
                }
                messages.add(message);
                continue;
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        return messages;
    }

    public int getQueueUnackTime() {
        return this.queueUnackTime;
    }

    public void setQueueUnackTime(int queueUnackTime) {
        this.queueUnackTime = queueUnackTime;
    }

    protected abstract List<String> pollMessages(double var1, double var3, int var5);

    protected abstract long queueSize();

    private synchronized void __peekedMessages(int count) {
        try {
            log.trace("Polling {} messages from {} with size {}", new Object[]{count, this.queueName, this.size});
            double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
            double maxTime = now + (double)this.queueUnackTime;
            long messageExpiry = (long)now + (long)this.queueUnackTime;
            List<String> response = this.pollMessages(now, maxTime, count);
            if (response == null) {
                return;
            }
            for (int i = 0; i < response.size(); i += 2) {
                long timeout = 0L;
                String id = response.get(i);
                String scoreString = response.get(i + 1);
                int priority = new BigDecimal(scoreString).remainder(BigDecimal.ONE).multiply(HUNDRED).intValue();
                QueueMessage message = new QueueMessage(id, "", timeout, priority);
                message.setExpiry(messageExpiry);
                this.peekedMessages.add(message);
            }
        }
        catch (Throwable t) {
            log.warn(t.getMessage(), t);
        }
    }
}

