/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.postgres.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.postgres.dao.PostgresBaseDAO;
import com.netflix.conductor.postgres.util.ExecutorsUtil;
import com.netflix.conductor.postgres.util.Query;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.sql.DataSource;
import org.springframework.retry.support.RetryTemplate;

public class PostgresQueueDAO
extends PostgresBaseDAO
implements QueueDAO {
    private static final Long UNACK_SCHEDULE_MS = 60000L;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ExecutorsUtil.newNamedThreadFactory("postgres-queue-"));

    public PostgresQueueDAO(RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
        super(retryTemplate, objectMapper, dataSource);
        this.scheduledExecutorService.scheduleAtFixedRate(this::processAllUnacks, UNACK_SCHEDULE_MS, UNACK_SCHEDULE_MS, TimeUnit.MILLISECONDS);
        this.logger.debug("{} is ready to serve", (Object)PostgresQueueDAO.class.getName());
    }

    @PreDestroy
    public void destroy() {
        try {
            this.scheduledExecutorService.shutdown();
            if (this.scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                this.logger.debug("tasks completed, shutting down");
            } else {
                this.logger.warn("Forcing shutdown after waiting for 30 seconds");
                this.scheduledExecutorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            this.logger.warn("Shutdown interrupted, invoking shutdownNow on scheduledExecutorService for processAllUnacks", (Throwable)ie);
            this.scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void push(String queueName, String messageId, long offsetTimeInSecond) {
        this.push(queueName, messageId, 0, offsetTimeInSecond);
    }

    public void push(String queueName, String messageId, int priority, long offsetTimeInSecond) {
        this.withTransaction(tx -> this.pushMessage((Connection)tx, queueName, messageId, null, priority, offsetTimeInSecond));
    }

    public void push(String queueName, List<Message> messages) {
        this.withTransaction(tx -> messages.forEach(message -> this.pushMessage((Connection)tx, queueName, message.getId(), message.getPayload(), message.getPriority(), 0L)));
    }

    public boolean pushIfNotExists(String queueName, String messageId, long offsetTimeInSecond) {
        return this.pushIfNotExists(queueName, messageId, 0, offsetTimeInSecond);
    }

    public boolean pushIfNotExists(String queueName, String messageId, int priority, long offsetTimeInSecond) {
        return this.getWithRetriedTransactions(tx -> {
            if (!this.existsMessage(tx, queueName, messageId)) {
                this.pushMessage(tx, queueName, messageId, null, priority, offsetTimeInSecond);
                return true;
            }
            return false;
        });
    }

    public List<String> pop(String queueName, int count, int timeout) {
        return this.pollMessages(queueName, count, timeout).stream().map(Message::getId).collect(Collectors.toList());
    }

    public List<Message> pollMessages(String queueName, int count, int timeout) {
        if (timeout < 1) {
            List messages = this.getWithTransactionWithOutErrorPropagation(tx -> this.popMessages(tx, queueName, count, timeout));
            if (messages == null) {
                return new ArrayList<Message>();
            }
            return messages;
        }
        long start = System.currentTimeMillis();
        ArrayList<Message> messages = new ArrayList<Message>();
        while (true) {
            List messagesSlice;
            if ((messagesSlice = this.getWithTransactionWithOutErrorPropagation(tx -> this.popMessages(tx, queueName, count - messages.size(), timeout))) == null) {
                this.logger.warn("Unable to poll {} messages from {} due to tx conflict, only {} popped", new Object[]{count, queueName, messages.size()});
                return messages;
            }
            messages.addAll(messagesSlice);
            if (messages.size() >= count || System.currentTimeMillis() - start > (long)timeout) {
                return messages;
            }
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
    }

    public void remove(String queueName, String messageId) {
        this.withTransaction(tx -> this.removeMessage((Connection)tx, queueName, messageId));
    }

    public int getSize(String queueName) {
        String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?";
        return this.queryWithTransaction("SELECT COUNT(*) FROM queue_message WHERE queue_name = ?", q -> Long.valueOf(q.addParameter(queueName).executeCount()).intValue());
    }

    public boolean ack(String queueName, String messageId) {
        return this.getWithRetriedTransactions(tx -> this.removeMessage(tx, queueName, messageId));
    }

    public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) {
        long updatedOffsetTimeInSecond = unackTimeout / 1000L;
        String UPDATE_UNACK_TIMEOUT = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) WHERE queue_name = ? AND message_id = ?";
        return this.queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) WHERE queue_name = ? AND message_id = ?", q -> q.addParameter(updatedOffsetTimeInSecond).addParameter(updatedOffsetTimeInSecond).addParameter(queueName).addParameter(messageId).executeUpdate()) == 1;
    }

    public void flush(String queueName) {
        String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = ?";
        this.executeWithTransaction("DELETE FROM queue_message WHERE queue_name = ?", q -> q.addParameter(queueName).executeDelete());
    }

    public Map<String, Long> queuesDetail() {
        String GET_QUEUES_DETAIL = "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q FOR SHARE SKIP LOCKED";
        return this.queryWithTransaction("SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q FOR SHARE SKIP LOCKED", q -> q.executeAndFetch(rs -> {
            HashMap detail = Maps.newHashMap();
            while (rs.next()) {
                String queueName = rs.getString("queue_name");
                Long size = rs.getLong("size");
                detail.put(queueName, size);
            }
            return detail;
        }));
    }

    public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
        String GET_QUEUES_DETAIL_VERBOSE = "SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q FOR SHARE SKIP LOCKED";
        return this.queryWithTransaction("SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q FOR SHARE SKIP LOCKED", q -> q.executeAndFetch(rs -> {
            HashMap result = Maps.newHashMap();
            while (rs.next()) {
                String queueName = rs.getString("queue_name");
                Long size = rs.getLong("size");
                Long queueUnacked = rs.getLong("uacked");
                result.put(queueName, ImmutableMap.of((Object)"a", (Object)ImmutableMap.of((Object)"size", (Object)size, (Object)"uacked", (Object)queueUnacked)));
            }
            return result;
        }));
    }

    public void processAllUnacks() {
        this.logger.trace("processAllUnacks started");
        this.getWithRetriedTransactions(tx -> {
            String LOCK_TASKS = "SELECT queue_name, message_id FROM queue_message WHERE popped = true AND (deliver_on + (60 ||' seconds')::interval)  <  current_timestamp limit 1000 FOR UPDATE SKIP LOCKED";
            List messages = this.query(tx, LOCK_TASKS, p -> p.executeAndFetch(rs -> {
                ArrayList<QueueMessage> results = new ArrayList<QueueMessage>();
                while (rs.next()) {
                    QueueMessage qm = new QueueMessage();
                    qm.queueName = rs.getString("queue_name");
                    qm.messageId = rs.getString("message_id");
                    results.add(qm);
                }
                return results;
            }));
            if (messages.size() == 0) {
                return 0;
            }
            HashMap queueMessageMap = new HashMap();
            for (QueueMessage qm : messages) {
                if (!queueMessageMap.containsKey(qm.queueName)) {
                    queueMessageMap.put(qm.queueName, new ArrayList());
                }
                ((List)queueMessageMap.get(qm.queueName)).add(qm.messageId);
            }
            int totalUnacked = 0;
            for (String queueName : queueMessageMap.keySet()) {
                Integer unacked = 0;
                try {
                    List msgIds = (List)queueMessageMap.get(queueName);
                    String UPDATE_POPPED = String.format("UPDATE queue_message SET popped = false WHERE queue_name = ? and message_id IN (%s)", Query.generateInBindings(msgIds.size()));
                    unacked = this.query(tx, UPDATE_POPPED, q -> q.addParameter(queueName).addParameters(msgIds).executeUpdate());
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                totalUnacked += unacked.intValue();
                this.logger.debug("Unacked {} messages from all queues", (Object)unacked);
            }
            if (totalUnacked > 0) {
                this.logger.debug("Unacked {} messages from all queues", (Object)totalUnacked);
            }
            return totalUnacked;
        });
    }

    public void processUnacks(String queueName) {
        String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval)  > deliver_on";
        this.executeWithTransaction("UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval)  > deliver_on", q -> q.addParameter(queueName).executeUpdate());
    }

    public boolean resetOffsetTime(String queueName, String messageId) {
        long offsetTimeInSecond = 0L;
        String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) \nWHERE queue_name = ? AND message_id = ?";
        return this.queryWithTransaction("UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) \nWHERE queue_name = ? AND message_id = ?", q -> q.addParameter(offsetTimeInSecond).addParameter(offsetTimeInSecond).addParameter(queueName).addParameter(messageId).executeUpdate() == 1);
    }

    private boolean existsMessage(Connection connection, String queueName, String messageId) {
        String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?) FOR SHARE";
        return this.query(connection, "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?) FOR SHARE", q -> q.addParameter(queueName).addParameter(messageId).exists());
    }

    private void pushMessage(Connection connection, String queueName, String messageId, String payload, Integer priority, long offsetTimeInSecond) {
        this.createQueueIfNotExists(connection, queueName);
        String UPDATE_MESSAGE = "UPDATE queue_message SET payload=?, deliver_on=(current_timestamp + (? ||' seconds')::interval) WHERE queue_name = ? AND message_id = ?";
        int rowsUpdated = this.query(connection, UPDATE_MESSAGE, q -> q.addParameter(payload).addParameter(offsetTimeInSecond).addParameter(queueName).addParameter(messageId).executeUpdate());
        if (rowsUpdated == 0) {
            String PUSH_MESSAGE = "INSERT INTO queue_message (deliver_on, queue_name, message_id, priority, offset_time_seconds, payload) VALUES ((current_timestamp + (? ||' seconds')::interval), ?,?,?,?,?) ON CONFLICT (queue_name,message_id) DO UPDATE SET payload=excluded.payload, deliver_on=excluded.deliver_on";
            this.execute(connection, PUSH_MESSAGE, q -> q.addParameter(offsetTimeInSecond).addParameter(queueName).addParameter(messageId).addParameter(priority).addParameter(offsetTimeInSecond).addParameter(payload).executeUpdate());
        }
    }

    private boolean removeMessage(Connection connection, String queueName, String messageId) {
        String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?";
        return this.query(connection, "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?", q -> q.addParameter(queueName).addParameter(messageId).executeDelete());
    }

    private List<Message> peekMessages(Connection connection, String queueName, int count) {
        if (count < 1) {
            return Collections.emptyList();
        }
        String PEEK_MESSAGES = "SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED";
        return this.query(connection, "SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED", p -> p.addParameter(queueName).addParameter(count).executeAndFetch(rs -> {
            ArrayList<Message> results = new ArrayList<Message>();
            while (rs.next()) {
                Message m = new Message();
                m.setId(rs.getString("message_id"));
                m.setPriority(rs.getInt("priority"));
                m.setPayload(rs.getString("payload"));
                results.add(m);
            }
            return results;
        }));
    }

    private List<Message> popMessages(Connection connection, String queueName, int count, int timeout) {
        List<Message> messages = this.peekMessages(connection, queueName, count);
        if (messages.isEmpty()) {
            return messages;
        }
        ArrayList<Message> poppedMessages = new ArrayList<Message>();
        for (Message message : messages) {
            String POP_MESSAGE = "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false";
            int result = this.query(connection, "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false", q -> q.addParameter(queueName).addParameter(message.getId()).executeUpdate());
            if (result != 1) continue;
            poppedMessages.add(message);
        }
        return poppedMessages;
    }

    public boolean containsMessage(String queueName, String messageId) {
        return this.getWithRetriedTransactions(tx -> this.existsMessage(tx, queueName, messageId));
    }

    private void createQueueIfNotExists(Connection connection, String queueName) {
        this.logger.trace("Creating new queue '{}'", (Object)queueName);
        String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = ?) FOR SHARE";
        boolean exists = this.query(connection, "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = ?) FOR SHARE", q -> q.addParameter(queueName).exists());
        if (!exists) {
            String CREATE_QUEUE = "INSERT INTO queue (queue_name) VALUES (?) ON CONFLICT (queue_name) DO NOTHING";
            this.execute(connection, "INSERT INTO queue (queue_name) VALUES (?) ON CONFLICT (queue_name) DO NOTHING", q -> q.addParameter(queueName).executeUpdate());
        }
    }

    private class QueueMessage {
        public String queueName;
        public String messageId;

        private QueueMessage() {
        }
    }
}

