/*
 * Decompiled with CFR 0.152.
 */
package io.orkes.conductor.mq.redis;

import com.google.common.util.concurrent.Uninterruptibles;
import io.orkes.conductor.mq.QueueMessage;
import java.math.BigDecimal;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class QueueMonitor {
    private static final Logger log = LoggerFactory.getLogger(QueueMonitor.class);
    private static final BigDecimal HUNDRED = new BigDecimal(100);
    private final Clock clock;
    private final LinkedBlockingQueue<QueueMessage> peekedMessages;
    private final ExecutorService executorService;
    private final AtomicInteger pollCount = new AtomicInteger(0);
    private final String queueName;
    private int queueUnackTime = 30000;
    private long size = 0L;
    private int maxPollCount = 100;

    public QueueMonitor(String queueName) {
        this.queueName = queueName;
        this.clock = Clock.systemDefaultZone();
        this.peekedMessages = new LinkedBlockingQueue();
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(this.maxPollCount));
    }

    public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
        ArrayList<QueueMessage> messages = new ArrayList<QueueMessage>();
        int pendingCount = this.pollCount.addAndGet(count);
        if (this.peekedMessages.isEmpty()) {
            this.__peekedMessages();
        } else if (this.peekedMessages.size() < pendingCount) {
            try {
                this.executorService.submit(() -> this.__peekedMessages());
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
        long now = this.clock.millis();
        boolean waited = false;
        for (int i = 0; i < count; ++i) {
            try {
                QueueMessage message = this.peekedMessages.poll();
                if (message == null) {
                    if (!waited) {
                        Uninterruptibles.sleepUninterruptibly((long)waitTime, (TimeUnit)timeUnit);
                        waited = true;
                        continue;
                    }
                    return messages;
                }
                if (now > message.getExpiry()) {
                    this.peekedMessages.clear();
                    this.pollCount.addAndGet(count);
                    return new ArrayList<QueueMessage>();
                }
                messages.add(message);
                continue;
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        return messages;
    }

    public int getQueueUnackTime() {
        return this.queueUnackTime;
    }

    public void setQueueUnackTime(int queueUnackTime) {
        this.queueUnackTime = queueUnackTime;
    }

    protected abstract List<String> pollMessages(double var1, double var3, int var5);

    protected abstract long queueSize();

    private synchronized void __peekedMessages() {
        try {
            int count = Math.min(this.maxPollCount, this.pollCount.get());
            if (count <= 0) {
                if (count < 0) {
                    log.warn("Negative poll count {}", (Object)this.pollCount.get());
                    this.pollCount.set(0);
                }
                return;
            }
            log.trace("Polling {} messages from {} with size {}", new Object[]{count, this.queueName, this.size});
            double now = Long.valueOf(this.clock.millis() + 1L).doubleValue();
            double maxTime = now + (double)this.queueUnackTime;
            long messageExpiry = (long)now + (long)this.queueUnackTime;
            List<String> response = this.pollMessages(now, maxTime, count);
            if (response == null) {
                return;
            }
            for (int i = 0; i < response.size(); i += 2) {
                long timeout = 0L;
                String id = response.get(i);
                String scoreString = response.get(i + 1);
                int priority = new BigDecimal(scoreString).remainder(BigDecimal.ONE).multiply(HUNDRED).intValue();
                QueueMessage message = new QueueMessage(id, "", timeout, priority);
                message.setExpiry(messageExpiry);
                this.peekedMessages.add(message);
            }
            this.pollCount.addAndGet(-1 * (response.size() / 2));
        }
        catch (Throwable t) {
            log.warn(t.getMessage(), t);
        }
    }
}

