/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.dao.mysql;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.mysql.MySQLBaseDAO;
import com.netflix.conductor.dao.mysql.Query;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.sql.DataSource;
import org.apache.commons.lang.time.DateUtils;

public class MySQLQueueDAO
extends MySQLBaseDAO
implements QueueDAO {
    @Inject
    public MySQLQueueDAO(ObjectMapper om, DataSource ds) {
        super(om, ds);
    }

    public void push(String queueName, String messageId, long offsetTimeInSecond) {
        this.withTransaction(tx -> this.pushMessage((Connection)tx, queueName, messageId, null, offsetTimeInSecond));
    }

    public void push(String queueName, List<Message> messages) {
        this.withTransaction(tx -> messages.forEach(message -> this.pushMessage((Connection)tx, queueName, message.getId(), message.getPayload(), 0L)));
    }

    public boolean pushIfNotExists(String queueName, String messageId, long offsetTimeInSecond) {
        return this.getWithTransaction(tx -> {
            if (!this.existsMessage(tx, queueName, messageId)) {
                this.pushMessage(tx, queueName, messageId, null, offsetTimeInSecond);
                return true;
            }
            return false;
        });
    }

    public List<String> pop(String queueName, int count, int timeout) {
        long start = System.currentTimeMillis();
        List<String> foundsIds = this.peekMessages(queueName, count);
        while (foundsIds.size() < count && System.currentTimeMillis() - start < (long)timeout) {
            Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
            foundsIds = this.peekMessages(queueName, count);
        }
        ImmutableList messageIds = ImmutableList.copyOf(foundsIds);
        return this.getWithTransaction(tx -> this.popMessages(tx, queueName, (List<String>)messageIds));
    }

    public List<Message> pollMessages(String queueName, int count, int timeout) {
        long start = System.currentTimeMillis();
        return (List)this.getWithTransaction(tx -> {
            List<String> peekedMessageIds = this.peekMessages(tx, queueName, count);
            while (peekedMessageIds.size() < count && System.currentTimeMillis() - start < (long)timeout) {
                Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                peekedMessageIds = this.peekMessages(queueName, count);
            }
            ImmutableList msgIds = ImmutableList.copyOf(peekedMessageIds);
            List<Message> messages = this.readMessages(tx, queueName, (List<String>)msgIds);
            this.popMessages(tx, queueName, (List<String>)msgIds);
            return ImmutableList.copyOf(messages);
        });
    }

    public void remove(String queueName, String messageId) {
        this.withTransaction(tx -> this.removeMessage((Connection)tx, queueName, messageId));
    }

    public int getSize(String queueName) {
        String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?";
        return this.queryWithTransaction(GET_QUEUE_SIZE, q -> Long.valueOf(q.addParameter(queueName).executeCount()).intValue());
    }

    public boolean ack(String queueName, String messageId) {
        return this.getWithTransaction(tx -> {
            if (this.existsMessage(tx, queueName, messageId)) {
                this.removeMessage(tx, queueName, messageId);
                return true;
            }
            return false;
        });
    }

    public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) {
        String UPDATE_UNACK_TIMEOUT = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND, ?, created_on) WHERE queue_name = ? AND message_id = ?";
        long updatedOffsetTimeInSecond = unackTimeout / 1000L;
        return this.queryWithTransaction(UPDATE_UNACK_TIMEOUT, q -> q.addParameter(updatedOffsetTimeInSecond).addParameter(updatedOffsetTimeInSecond).addParameter(queueName).addParameter(messageId).executeUpdate()) == 1;
    }

    public void flush(String queueName) {
        String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = ?";
        this.executeWithTransaction(FLUSH_QUEUE, q -> q.addParameter(queueName).executeDelete());
    }

    public Map<String, Long> queuesDetail() {
        String GET_QUEUES_DETAIL = "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q";
        return this.queryWithTransaction(GET_QUEUES_DETAIL, q -> q.executeAndFetch(rs -> {
            HashMap detail = Maps.newHashMap();
            while (rs.next()) {
                String queueName = rs.getString("queue_name");
                Long size = rs.getLong("size");
                detail.put(queueName, size);
            }
            return detail;
        }));
    }

    public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
        String GET_QUEUES_DETAIL_VERBOSE = "SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q";
        return this.queryWithTransaction(GET_QUEUES_DETAIL_VERBOSE, q -> q.executeAndFetch(rs -> {
            HashMap result = Maps.newHashMap();
            while (rs.next()) {
                String queueName = rs.getString("queue_name");
                Long size = rs.getLong("size");
                Long queueUnacked = rs.getLong("uacked");
                result.put(queueName, ImmutableMap.of((Object)"a", (Object)ImmutableMap.of((Object)"size", (Object)size, (Object)"uacked", (Object)queueUnacked)));
            }
            return result;
        }));
    }

    public void processUnacks(String queueName) {
        String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND CURRENT_TIMESTAMP > deliver_on";
        this.executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
    }

    public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) {
        String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,created_on) \nWHERE queue_name = ? AND message_id = ?";
        return this.queryWithTransaction(SET_OFFSET_TIME, q -> q.addParameter(offsetTimeInSecond).addParameter(offsetTimeInSecond).addParameter(queueName).addParameter(messageId).executeUpdate() == 1);
    }

    private boolean existsMessage(Connection connection, String queueName, String messageId) {
        String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?)";
        return this.query(connection, EXISTS_MESSAGE, q -> q.addParameter(queueName).addParameter(messageId).exists());
    }

    private void pushMessage(Connection connection, String queueName, String messageId, String payload, long offsetTimeInSecond) {
        String PUSH_MESSAGE = "INSERT INTO queue_message (created_on, deliver_on, queue_name, message_id, offset_time_seconds, payload) VALUES (?, ?, ?, ?, ?, ?)";
        String UPDATE_MESSAGE = "UPDATE queue_message SET payload = ? WHERE queue_name = ? AND message_id = ?";
        this.createQueueIfNotExists(connection, queueName);
        Date now = DateUtils.truncate((Date)new Date(), (int)13);
        Date deliverTime = new Date(now.getTime() + offsetTimeInSecond * 1000L);
        boolean exists = this.existsMessage(connection, queueName, messageId);
        if (!exists) {
            this.execute(connection, PUSH_MESSAGE, q -> q.addTimestampParameter(now).addTimestampParameter(deliverTime).addParameter(queueName).addParameter(messageId).addParameter(offsetTimeInSecond).addParameter(payload).executeUpdate());
        } else {
            this.execute(connection, UPDATE_MESSAGE, q -> q.addParameter(payload).addParameter(queueName).addParameter(messageId).executeUpdate());
        }
    }

    private void removeMessage(Connection connection, String queueName, String messageId) {
        String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?";
        this.execute(connection, REMOVE_MESSAGE, q -> q.addParameter(queueName).addParameter(messageId).executeDelete());
    }

    private List<String> peekMessages(String queueName, int count) {
        return this.getWithTransaction(tx -> this.peekMessages(tx, queueName, count));
    }

    private List<String> peekMessages(Connection connection, String queueName, int count) {
        if (count < 1) {
            return Collections.emptyList();
        }
        long peekTime = System.currentTimeMillis() + 1L;
        String PEEK_MESSAGES = "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= TIMESTAMP(?) ORDER BY deliver_on, created_on LIMIT ?";
        return this.query(connection, PEEK_MESSAGES, q -> q.addParameter(queueName).addTimestampParameter(peekTime).addParameter(count).executeScalarList(String.class));
    }

    private List<String> popMessages(Connection connection, String queueName, List<String> messageIds) {
        if (messageIds.isEmpty()) {
            return messageIds;
        }
        String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id IN (%s) AND popped = false";
        String query = String.format("UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id IN (%s) AND popped = false", Query.generateInBindings(messageIds.size()));
        int result = this.query(connection, query, q -> q.addParameter(queueName).addParameters(messageIds).executeUpdate());
        if (result != messageIds.size()) {
            String message = String.format("could not pop all messages for given ids: %s (%d messages were popped)", messageIds, result);
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message);
        }
        return messageIds;
    }

    private List<Message> readMessages(Connection connection, String queueName, List<String> messageIds) {
        if (messageIds.isEmpty()) {
            return Collections.emptyList();
        }
        String READ_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = ? AND message_id IN (%s) AND popped = false ORDER BY deliver_on, created_on";
        String query = String.format("SELECT message_id, payload FROM queue_message WHERE queue_name = ? AND message_id IN (%s) AND popped = false ORDER BY deliver_on, created_on", Query.generateInBindings(messageIds.size()));
        List messages = this.query(connection, query, p -> p.addParameter(queueName).addParameters(messageIds).executeAndFetch(rs -> {
            ArrayList<Message> results = new ArrayList<Message>();
            while (rs.next()) {
                Message m = new Message();
                m.setId(rs.getString("message_id"));
                m.setPayload(rs.getString("payload"));
                results.add(m);
            }
            return results;
        }));
        if (messages.size() != messageIds.size()) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "could not read all messages for given ids: " + messageIds);
        }
        return messages;
    }

    private void createQueueIfNotExists(Connection connection, String queueName) {
        String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = ?)";
        boolean queueExists = this.query(connection, EXISTS_QUEUE, q -> q.addParameter(queueName).exists());
        if (!queueExists) {
            this.logger.debug("creating queue {}", (Object)queueName);
            String CREATE_QUEUE = "INSERT INTO queue (queue_name) VALUES (?)";
            this.execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate());
        }
    }
}

