/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.dao.mysql;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.mysql.MySQLBaseDAO;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.lang.time.DateUtils;
import org.sql2o.Connection;
import org.sql2o.Sql2o;

class MySQLQueueDAO
extends MySQLBaseDAO
implements QueueDAO {
    @Inject
    MySQLQueueDAO(ObjectMapper om, Sql2o sql2o) {
        super(om, sql2o);
    }

    public void push(String queueName, String messageId, long offsetTimeInSecond) {
        this.withTransaction(tx -> this.pushMessage((Connection)tx, queueName, messageId, null, offsetTimeInSecond));
    }

    public void push(String queueName, List<Message> messages) {
        this.withTransaction(tx -> messages.forEach(message -> this.pushMessage((Connection)tx, queueName, message.getId(), message.getPayload(), 0L)));
    }

    public boolean pushIfNotExists(String queueName, String messageId, long offsetTimeInSecond) {
        return this.getWithTransaction(tx -> {
            if (!this.existsMessage((Connection)tx, queueName, messageId)) {
                this.pushMessage((Connection)tx, queueName, messageId, null, offsetTimeInSecond);
                return true;
            }
            return false;
        });
    }

    public List<String> pop(String queueName, int count, int timeout) {
        long start = System.currentTimeMillis();
        List<String> foundsIds = this.peekMessages(queueName, count);
        while (foundsIds.size() < count && System.currentTimeMillis() - start < (long)timeout) {
            Uninterruptibles.sleepUninterruptibly((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
            foundsIds = this.peekMessages(queueName, count);
        }
        ImmutableList messageIds = ImmutableList.copyOf(foundsIds);
        return this.getWithTransaction(tx -> this.popMessages((Connection)tx, queueName, (List<String>)messageIds));
    }

    public List<Message> pollMessages(String queueName, int count, int timeout) {
        List<String> poppedMessageIds = this.pop(queueName, count, timeout);
        return this.readMessages(queueName, poppedMessageIds);
    }

    public void remove(String queueName, String messageId) {
        this.withTransaction(tx -> this.removeMessage((Connection)tx, queueName, messageId));
    }

    public int getSize(String queueName) {
        String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = :queueName";
        return this.getWithTransaction(tx -> (Integer)tx.createQuery(GET_QUEUE_SIZE).addParameter("queueName", queueName).executeScalar(Integer.class));
    }

    public boolean ack(String queueName, String messageId) {
        return this.getWithTransaction(tx -> {
            if (this.existsMessage((Connection)tx, queueName, messageId)) {
                this.removeMessage((Connection)tx, queueName, messageId);
                return true;
            }
            return false;
        });
    }

    public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) {
        String UPDATE_UNACK_TIMEOUT = "UPDATE queue_message SET offset_time_seconds = :offsetSeconds, deliver_on = TIMESTAMPADD(SECOND,:offsetSeconds,created_on) \nWHERE queue_name = :queueName AND message_id = :messageId";
        long updatedOffsetTimeInSecond = unackTimeout / 1000L;
        return this.getWithTransaction(tx -> tx.createQuery(UPDATE_UNACK_TIMEOUT).addParameter("queueName", queueName).addParameter("messageId", messageId).addParameter("offsetSeconds", updatedOffsetTimeInSecond).executeUpdate().getResult()) == 1;
    }

    public void flush(String queueName) {
        String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = :queueName";
        this.withTransaction(tx -> tx.createQuery(FLUSH_QUEUE).addParameter("queueName", queueName).executeUpdate());
    }

    public Map<String, Long> queuesDetail() {
        HashMap detail = Maps.newHashMap();
        String GET_QUEUES_DETAIL = "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q";
        this.withTransaction(tx -> tx.createQuery(GET_QUEUES_DETAIL).executeAndFetchTable().asList().forEach(row -> {
            String queueName = (String)row.get("queue_name");
            Number queueSize = (Number)row.get("size");
            detail.put(queueName, queueSize.longValue());
        }));
        return detail;
    }

    public Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose() {
        HashMap result = Maps.newHashMap();
        String GET_QUEUES_DETAIL_VERBOSE = "SELECT queue_name, \n       (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n       (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \nFROM queue q";
        this.withTransaction(tx -> tx.createQuery(GET_QUEUES_DETAIL_VERBOSE).executeAndFetchTable().asList().forEach(row -> {
            String queueName = (String)row.get("queue_name");
            Number queueSize = (Number)row.get("size");
            Number queueUnacked = (Number)row.get("uacked");
            result.put(queueName, ImmutableMap.of((Object)"a", (Object)ImmutableMap.of((Object)"size", (Object)queueSize.longValue(), (Object)"uacked", (Object)queueUnacked.longValue())));
        }));
        return result;
    }

    public void processUnacks(String queueName) {
        String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE CURRENT_TIMESTAMP > deliver_on AND popped = true";
        this.withTransaction(tx -> tx.createQuery(PROCESS_UNACKS).executeUpdate());
    }

    public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) {
        String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = :offsetSeconds, deliver_on = TIMESTAMPADD(SECOND,:offsetSeconds,created_on) \nWHERE queue_name = :queueName AND message_id = :messageId";
        return this.getWithTransaction(tx -> tx.createQuery(SET_OFFSET_TIME).addParameter("queueName", queueName).addParameter("messageId", messageId).addParameter("offsetSeconds", offsetTimeInSecond).executeUpdate().getResult()) == 1;
    }

    private boolean existsMessage(Connection connection, String queueName, String messageId) {
        String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = :queueName AND message_id = :messageId)";
        return (Boolean)connection.createQuery(EXISTS_MESSAGE).addParameter("queueName", queueName).addParameter("messageId", messageId).executeScalar(Boolean.class);
    }

    private void pushMessage(Connection connection, String queueName, String messageId, String payload, long offsetTimeInSecond) {
        String PUSH_MESSAGE = "INSERT INTO queue_message (created_on, deliver_on, queue_name, message_id, offset_time_seconds, payload) VALUES (:createdOn, :deliverOn, :queueName, :messageId, :offsetSeconds, :payload)";
        String UPDATE_MESSAGE = "UPDATE queue_message SET payload = :payload WHERE queue_name = :queueName AND message_id = :messageId";
        this.createQueueIfNotExists(connection, queueName);
        Date now = DateUtils.truncate((Date)new Date(), (int)13);
        Date deliverTime = new Date(now.getTime() + offsetTimeInSecond * 1000L);
        boolean exists = this.existsMessage(connection, queueName, messageId);
        if (!exists) {
            connection.createQuery(PUSH_MESSAGE).addParameter("createdOn", (Object)now).addParameter("deliverOn", (Object)deliverTime).addParameter("queueName", queueName).addParameter("messageId", messageId).addParameter("offsetSeconds", offsetTimeInSecond).addParameter("payload", payload).executeUpdate();
        } else {
            connection.createQuery(UPDATE_MESSAGE).addParameter("queueName", queueName).addParameter("messageId", messageId).addParameter("payload", payload).executeUpdate();
        }
    }

    private void removeMessage(Connection connection, String queueName, String messageId) {
        String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = :queueName AND message_id = :messageId";
        connection.createQuery(REMOVE_MESSAGE).addParameter("queueName", queueName).addParameter("messageId", messageId).executeUpdate();
    }

    private List<String> peekMessages(String queueName, int count) {
        if (count < 1) {
            return Collections.emptyList();
        }
        String PEEK_MESSAGES = "SELECT message_id FROM queue_message WHERE queue_name = :queueName LIMIT :count";
        return this.getWithTransaction(tx -> tx.createQuery(PEEK_MESSAGES).addParameter("queueName", queueName).addParameter("count", count).executeScalarList(String.class));
    }

    private List<String> popMessages(Connection connection, String queueName, List<String> messageIds) {
        if (messageIds.isEmpty()) {
            return messageIds;
        }
        String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = :queueName AND message_id IN (%s)";
        String query = this.generateQueryWithParametersListPlaceholders(POP_MESSAGES, messageIds.size());
        int result = connection.createQuery(query).addParameter("queueName", queueName).withParams(messageIds.toArray()).executeUpdate().getResult();
        if (result != messageIds.size()) {
            String message = String.format("could not pop all messages for given ids: %s (%d messages were popped)", messageIds, result);
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message);
        }
        return messageIds;
    }

    private List<Message> readMessages(String queueName, List<String> messageIds) {
        if (messageIds.isEmpty()) {
            return Collections.emptyList();
        }
        String READ_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = :queueName AND message_id IN (%s)";
        String query = this.generateQueryWithParametersListPlaceholders(READ_MESSAGES, messageIds.size());
        List messages = this.getWithTransaction(tx -> tx.createQuery(query).addParameter("queueName", queueName).addColumnMapping("message_id", "id").withParams(messageIds.toArray()).executeAndFetch(Message.class));
        if (messages.size() != messageIds.size()) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "could not read all messages for given ids: " + messageIds);
        }
        return messages;
    }

    private void createQueueIfNotExists(Connection connection, String queueName) {
        String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = :queueName)";
        boolean queueExists = (Boolean)connection.createQuery(EXISTS_QUEUE).addParameter("queueName", queueName).executeScalar(Boolean.class);
        if (!queueExists) {
            this.logger.info("creating queue {}", (Object)queueName);
            String CREATE_QUEUE = "INSERT INTO queue (queue_name) VALUES (:queueName)";
            connection.createQuery(CREATE_QUEUE).addParameter("queueName", queueName).executeUpdate();
        }
    }
}

