/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.jms.jdbc;

import com.caucho.jms.JMSExceptionWrapper;
import com.caucho.jms.jdbc.JdbcManager;
import com.caucho.jms.jdbc.JdbcMessage;
import com.caucho.jms.jdbc.JdbcQueue;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.session.MessageConsumerImpl;
import com.caucho.jms.session.SessionImpl;
import com.caucho.log.Log;
import com.caucho.util.Alarm;
import com.caucho.util.AlarmListener;
import com.caucho.util.L10N;
import com.rc.retroweaver.runtime.ClassLiteral;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.sql.DataSource;

public class JdbcQueueConsumer
extends MessageConsumerImpl
implements QueueReceiver,
AlarmListener {
    static final Logger log = Log.open(ClassLiteral.getClass((String)"com/caucho/jms/jdbc/JdbcQueueConsumer"));
    static final L10N L = new L10N(ClassLiteral.getClass((String)"com/caucho/jms/jdbc/JdbcQueueConsumer"));
    private static final long QUEUE_TIMEOUT = 3600000L;
    private JdbcManager _jdbcManager;
    private JdbcQueue _queue;
    private long _consumerId;
    private boolean _autoAck;
    private boolean _isClosed;
    private Alarm _alarm;
    private long _lastPurgeTime;

    public JdbcQueueConsumer(SessionImpl session, String messageSelector, JdbcManager jdbcManager, JdbcQueue queue) throws JMSException {
        block5: {
            block4: {
                super(session, messageSelector, queue, false);
                this._jdbcManager = jdbcManager;
                this._queue = queue;
                if (session.getAcknowledgeMode() == 1) break block4;
                if (session.getAcknowledgeMode() != 3) break block5;
            }
            this._autoAck = true;
        }
        this.createQueue();
        this._alarm = new Alarm(this, 900000L);
        if (log.isLoggable(Level.FINE)) {
            log.fine("JdbcQueueConsumer[" + queue + "," + this._consumerId + "] created");
        }
    }

    public Queue getQueue() {
        return this._queue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createQueue() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "INSERT INTO " + consumerTable + " (queue, expire) VALUES (?,?)";
                PreparedStatement pstmt = conn.prepareStatement(sql, 1);
                pstmt.setInt(1, this._queue.getId());
                pstmt.setLong(2, Alarm.getCurrentTime() + 3600000L);
                pstmt.executeUpdate();
                ResultSet rsKeys = pstmt.getGeneratedKeys();
                if (!rsKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = rsKeys.getLong(1);
                rsKeys.close();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteQueue() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "DELETE FROM " + consumerTable + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected MessageImpl receiveImpl() throws JMSException {
        try {
            this.purgeExpiredConsumers();
            this._queue.purgeExpiredMessages();
            long minId = -1L;
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection conn = dataSource.getConnection();
            try {
                MessageImpl msg;
                int updateCount;
                String sql = "SELECT m_id, msg_type, delivered, body, header FROM " + messageTable + " WHERE ?<m_id AND queue=?" + "   AND consumer IS NULL AND ?<=expire" + " ORDER BY m_id";
                PreparedStatement selectStmt = conn.prepareStatement(sql);
                selectStmt.setFetchSize(1);
                sql = this._autoAck ? "DELETE FROM " + messageTable + " WHERE m_id=? AND consumer IS NULL" : "UPDATE " + messageTable + " SET consumer=?, delivered=1" + " WHERE m_id=? AND consumer IS NULL";
                PreparedStatement updateStmt = conn.prepareStatement(sql);
                long id = -1L;
                do {
                    id = -1L;
                    selectStmt.setLong(1, minId);
                    selectStmt.setInt(2, this._queue.getId());
                    selectStmt.setLong(3, Alarm.getCurrentTime());
                    msg = null;
                    ResultSet rs = selectStmt.executeQuery();
                    while (rs.next()) {
                        minId = id = rs.getLong(1);
                        msg = jdbcMessage.readMessage(rs);
                        if (this._selector == null || this._selector.isMatch(msg)) break;
                        msg = null;
                    }
                    rs.close();
                    if (msg == null) {
                        MessageImpl messageImpl = null;
                        return messageImpl;
                    }
                    if (this._autoAck) {
                        updateStmt.setLong(1, id);
                        continue;
                    }
                    updateStmt.setLong(1, this._consumerId);
                    updateStmt.setLong(2, id);
                } while ((updateCount = updateStmt.executeUpdate()) != 1);
                MessageImpl messageImpl = msg;
                return messageImpl;
            }
            finally {
                conn.close();
            }
        }
        catch (IOException e) {
            throw new JMSExceptionWrapper(e);
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge() throws JMSException {
        if (this._autoAck) {
            return;
        }
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "DELETE FROM " + messageTable + " " + "WHERE consumer=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback() throws JMSException {
        if (this._autoAck) {
            return;
        }
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "UPDATE " + messageTable + " SET consumer=NULL " + " WHERE consumer=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void purgeExpiredConsumers() {
        long now = Alarm.getCurrentTime();
        if (now < this._lastPurgeTime + 3600000L) {
            return;
        }
        this._lastPurgeTime = now;
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "UPDATE " + messageTable + " SET consumer=NULL" + " WHERE consumer IS NOT NULL" + "  AND EXISTS(SELECT * FROM " + consumerTable + "             WHERE s_id=consumer AND expire<?)";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, Alarm.getCurrentTime());
                int count = pstmt.executeUpdate();
                pstmt.close();
                if (count > 0) {
                    log.fine("JMSQueue[" + this._queue.getName() + "] recovered " + count + " messages");
                }
                sql = "DELETE FROM " + consumerTable + " WHERE expire<?";
                pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, Alarm.getCurrentTime());
                pstmt.executeUpdate();
            }
            finally {
                conn.close();
            }
        }
        catch (Exception e) {
            log.log(Level.FINER, e.toString(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleAlarm(Alarm alarm) {
        if (this._isClosed) {
            return;
        }
        try {
            Connection conn = this._jdbcManager.getDataSource().getConnection();
            try {
                String consumerTable = this._jdbcManager.getConsumerTable();
                String sql = "UPDATE " + consumerTable + " SET expire=?" + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, Alarm.getCurrentTime() + 3600000L);
                pstmt.setLong(2, this._consumerId);
                pstmt.executeUpdate();
            }
            finally {
                conn.close();
            }
        }
        catch (Throwable e) {
            log.log(Level.WARNING, e.toString(), e);
        }
        finally {
            this._alarm.queue(900000L);
        }
    }

    public void close() throws JMSException {
        if (this._isClosed) {
            return;
        }
        this._isClosed = true;
        this._alarm.dequeue();
        try {
            this.deleteQueue();
        }
        catch (Throwable e) {
            log.log(Level.WARNING, e.toString(), e);
        }
        super.close();
    }

    public String toString() {
        return "JdbcQueueConsumer[" + this._queue + "," + this._consumerId + "]";
    }
}

