/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.jms.jdbc;

import com.caucho.jms.JMSExceptionWrapper;
import com.caucho.jms.jdbc.JdbcManager;
import com.caucho.jms.jdbc.JdbcMessage;
import com.caucho.jms.jdbc.JdbcTopic;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.session.MessageConsumerImpl;
import com.caucho.jms.session.SessionImpl;
import com.caucho.log.Log;
import com.caucho.util.Alarm;
import com.caucho.util.AlarmListener;
import com.caucho.util.L10N;
import com.rc.retroweaver.runtime.ClassLiteral;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.sql.DataSource;

public class JdbcTopicConsumer
extends MessageConsumerImpl
implements AlarmListener,
TopicSubscriber {
    static final Logger log = Log.open(ClassLiteral.getClass((String)"com/caucho/jms/jdbc/JdbcTopicConsumer"));
    static final L10N L = new L10N(ClassLiteral.getClass((String)"com/caucho/jms/jdbc/JdbcTopicConsumer"));
    private static final long TOPIC_TIMEOUT = 3600000L;
    private JdbcManager _jdbcManager;
    private JdbcTopic _topic;
    private String _subscriber;
    private long _consumerId;
    private long _lastPurgeTime;
    private boolean _isClosed;
    private Alarm _alarm;

    public JdbcTopicConsumer(SessionImpl session, String messageSelector, JdbcManager jdbcManager, JdbcTopic topic, boolean noLocal) throws JMSException {
        super(session, messageSelector, topic, noLocal);
        this._jdbcManager = jdbcManager;
        this._topic = topic;
        this.createTopic();
        this._alarm = new Alarm(this, 900000L);
    }

    public JdbcTopicConsumer(SessionImpl session, String messageSelector, JdbcManager jdbcManager, JdbcTopic topic, boolean noLocal, String name) throws JMSException {
        super(session, messageSelector, topic, noLocal);
        this._jdbcManager = jdbcManager;
        this._topic = topic;
        this._subscriber = name;
        this.createTopic(name);
    }

    public Topic getTopic() {
        return this._topic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createTopic() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            String messageTable = this._jdbcManager.getMessageTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "SELECT MAX(m_id) FROM " + messageTable + " WHERE queue=?";
                long max = -1L;
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setInt(1, this._topic.getId());
                ResultSet rs = pstmt.executeQuery();
                if (rs.next()) {
                    max = rs.getLong(1);
                }
                rs.close();
                sql = "INSERT INTO " + consumerTable + " (queue, expire, read, ack) VALUES (?,?,?,?)";
                pstmt = conn.prepareStatement(sql, 1);
                pstmt.setInt(1, this._topic.getId());
                pstmt.setLong(2, Alarm.getCurrentTime() + 3600000L);
                pstmt.setLong(3, max);
                pstmt.setLong(4, max);
                pstmt.executeUpdate();
                ResultSet rsKeys = pstmt.getGeneratedKeys();
                if (!rsKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = rsKeys.getLong(1);
                rsKeys.close();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteTopic() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "DELETE FROM " + consumerTable + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createTopic(String name) throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            String messageTable = this._jdbcManager.getMessageTable();
            String clientId = this._session.getClientID();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "SELECT s_id FROM " + consumerTable + " WHERE queue=? AND client=? AND name=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setInt(1, this._topic.getId());
                pstmt.setString(2, clientId);
                pstmt.setString(3, name);
                ResultSet rs = pstmt.executeQuery();
                if (rs.next()) {
                    this._consumerId = rs.getLong(1);
                    rs.close();
                    return;
                }
                sql = "SELECT MAX(m_id) FROM " + messageTable + " WHERE queue=?";
                long max = -1L;
                pstmt = conn.prepareStatement(sql);
                pstmt.setInt(1, this._topic.getId());
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    max = rs.getLong(1);
                }
                rs.close();
                sql = "INSERT INTO " + consumerTable + " (queue, client, name, expire, read, ack) VALUES (?,?,?,?,?,?)";
                pstmt = conn.prepareStatement(sql, 1);
                pstmt.setInt(1, this._topic.getId());
                pstmt.setString(2, clientId);
                pstmt.setString(3, name);
                pstmt.setLong(4, 0x3FFFFFFFFFFFFFFFL);
                pstmt.setLong(5, max);
                pstmt.setLong(6, max);
                pstmt.executeUpdate();
                ResultSet rsKeys = pstmt.getGeneratedKeys();
                if (!rsKeys.next()) {
                    throw new JMSException(L.l("consumer insert didn't create a key"));
                }
                this._consumerId = rsKeys.getLong(1);
                rsKeys.close();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected MessageImpl receiveImpl() throws JMSException {
        this.purgeExpiredConsumers();
        this._topic.purgeExpiredMessages();
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "SELECT m_id, msg_type, delivered, body, header FROM " + messageTable + " AS m," + "      " + consumerTable + " AS s" + " WHERE s_id=? AND m.queue=s.queue AND s.read<m_id" + "   AND ?<m.expire" + " ORDER BY m_id";
                PreparedStatement selectStmt = conn.prepareStatement(sql);
                selectStmt.setFetchSize(1);
                sql = "UPDATE " + consumerTable + " SET read=?" + " WHERE s_id=?";
                PreparedStatement updateStmt = conn.prepareStatement(sql);
                long id = -1L;
                selectStmt.setLong(1, this._consumerId);
                selectStmt.setLong(2, Alarm.getCurrentTime());
                MessageImpl msg = null;
                ResultSet rs = selectStmt.executeQuery();
                while (rs.next()) {
                    id = rs.getLong(1);
                    msg = jdbcMessage.readMessage(rs);
                    if (this._selector == null || this._selector.isMatch(msg)) break;
                    msg = null;
                }
                rs.close();
                if (msg == null) {
                    MessageImpl messageImpl = null;
                    return messageImpl;
                }
                updateStmt.setLong(1, id);
                updateStmt.setLong(2, this._consumerId);
                updateStmt.executeUpdate();
                MessageImpl messageImpl = msg;
                return messageImpl;
            }
            finally {
                conn.close();
            }
        }
        catch (IOException e) {
            throw new JMSExceptionWrapper(e);
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "UPDATE " + consumerTable + " SET ack=read " + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
                this.deleteOldMessages();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteOldMessages() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "DELETE FROM " + messageTable + " WHERE queue=? AND NOT EXISTS(" + "   SELECT * FROM " + consumerTable + "   WHERE queue=? AND ack < m_id)";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setInt(1, this._topic.getId());
                pstmt.setInt(2, this._topic.getId());
                pstmt.executeUpdate();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback() throws JMSException {
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String consumerTable = this._jdbcManager.getConsumerTable();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "UPDATE " + consumerTable + " SET read=ack " + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, this._consumerId);
                pstmt.executeUpdate();
                pstmt.close();
            }
            finally {
                conn.close();
            }
        }
        catch (SQLException e) {
            throw new JMSExceptionWrapper(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void purgeExpiredConsumers() {
        long now = Alarm.getCurrentTime();
        if (now < this._lastPurgeTime + 3600000L) {
            return;
        }
        this._lastPurgeTime = now;
        try {
            DataSource dataSource = this._jdbcManager.getDataSource();
            String messageTable = this._jdbcManager.getMessageTable();
            String consumerTable = this._jdbcManager.getConsumerTable();
            JdbcMessage jdbcMessage = this._jdbcManager.getJdbcMessage();
            Connection conn = dataSource.getConnection();
            try {
                String sql = "DELETE FROM " + consumerTable + " WHERE is_topic=1 AND expire<?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, Alarm.getCurrentTime());
                pstmt.executeUpdate();
            }
            finally {
                conn.close();
            }
        }
        catch (Exception e) {
            log.log(Level.FINER, e.toString(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleAlarm(Alarm alarm) {
        if (this._isClosed) {
            return;
        }
        try {
            Connection conn = this._jdbcManager.getDataSource().getConnection();
            try {
                String consumerTable = this._jdbcManager.getConsumerTable();
                String sql = "UPDATE " + consumerTable + " SET expire=?" + " WHERE s_id=?";
                PreparedStatement pstmt = conn.prepareStatement(sql);
                pstmt.setLong(1, Alarm.getCurrentTime() + 3600000L);
                pstmt.setLong(2, this._consumerId);
                pstmt.executeUpdate();
            }
            finally {
                conn.close();
            }
        }
        catch (Throwable e) {
            log.log(Level.WARNING, e.toString(), e);
        }
        finally {
            this._alarm.queue(900000L);
        }
    }

    public void close() throws JMSException {
        if (this._isClosed) {
            return;
        }
        this._isClosed = true;
        if (this._alarm != null) {
            this._alarm.dequeue();
        }
        try {
            if (this._subscriber == null) {
                this.deleteTopic();
            } else {
                this._session.unsubscribe(this._subscriber);
            }
        }
        catch (Throwable e) {
            log.log(Level.WARNING, e.toString(), e);
        }
        super.close();
    }

    public String toString() {
        return "JdbcTopicConsumer[" + this._topic + "," + this._consumerId + "]";
    }
}

