random technical thoughts from the Nominet technical team

Spring JMS with Oracle AQ

1 Star2 Stars3 Stars4 Stars5 Stars (15 votes, average: 4.8 out of 5)
Loading ... Loading ...
Posted by tom on Oct 4th, 2007

This probably isn’t news to many but I only recently noticed that Oracle AQ can manage JMS queues. We don’t use a full blown J2EE application server so our options when using a JMS broker is to either have a standalone server/cluster (more work for SAs) or to embed the JMS server in to our middleware (more work for me). By using the database we add no new dependencies or software so this is a pretty neat solution. Another big advantage for us is that it looks like we would be able to create messages inside PL/SQL procedures but I may be getting ahead of myself here.

There isn’t much to it but it was a lot harder to get to the same point than with the other JMS brokers I have been testing. There is a lot of documentation to go through and it is all fairly general. I also couldn’t find a good example so I’ve created a quick howto…

First thing is to choose a user that is going to use the queue and set up the grants…

grant execute on sys.dbms_aqadm to mquser;
grant execute on sys.dbms_aq to mquser;
grant execute on sys.dbms_aqin to mquser;
grant execute on sys.dbms_aqjms to mquser;
exec dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','mquser');
exec dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','mquser');

And then as that user create the queue

exec dbms_aqadm.create_queue_table(queue_table=>'JMS_QUEUE_TABLE', queue_payload_type=>'sys.aq$_jms_text_message',multiple_consumers=>false);
exec dbms_aqadm.create_queue(queue_name=>'testmq', queue_table=>'JMS_QUEUE_TABLE');
exec dbms_aqadm.start_queue(queue_name=>'testmq');

This example sets up a point to point queue. I haven’t tried using a pub/sub model but at the very least

multiple_consumers=>true

would need to be set.

For the Java/Spring side you’ll need to download the Oracle Containers for Java (OC4J) pack and add aqapi.jar to your classpath.

For sending JMS messages from Spring you need to set up at least three beans. The interesting one is an instance of JmsTemplate which saves us from writing any boiler-plate JMS code. This requires a JMS ConnectionFactory and a default destination which are the two other beans. These are not so trivial to set up with Oracle AQ and at the moment I am creating these with extra factory-beans.

public class OracleAqConnectionFactoryInitialiser {
      public ConnectionFactory createConnectionFactory() throws Exception{
        return oracle.jms.AQjmsFactory.getQueueConnectionFactory("jdbc:oracle:thin:mquser/mqpass@myhost:1521:testtom", new Properties());
    }
}

(There is also a getQueueConnectionFactory(DataSource ds) method which is probably what you really want)

Finding the JMS destination has to be done via the connection so we have to create a Spring FactoryBean…

public class OracleAqDestinationFactoryBean implements FactoryBean {
    private QueueConnectionFactory connectionFactory;
    private String queueName;
    private String queueUser;    
 
    @Required
    public void setConnectionFactory(QueueConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }    
 
    @Required
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }    
 
    @Required
    public void setQueueUser(String queueUser) {
        this.queueUser = queueUser;
    }    
 
    public Object getObject() throws Exception {
        // createQueueSession(true,0) - create a transactional session
        AQjmsSession session = (AQjmsSession) connectionFactory.createQueueConnection().createQueueSession(true, 0);
        return session.getQueue(queueUser, queueName);
    }    
 
    public Class getObjectType() {
        return javax.jms.Queue.class;
    }    
 
    public boolean isSingleton() {
		return false;
    }
}

Wire up these factory-beans and the ConnectionFactory and Queue…

<bean id="aqConnectionFactoryInitialiser" class="uk.nominet.mq.OracleAqConnectionFactoryInitialiser">
    <property name="dataSource" ref="oracleDs"/>
</bean>
<bean id="testmq" class="uk.nominet.mq.OracleAqDestinationFactoryBean">
    <property name="connectionFactory" ref="aqConnectionFactory"/>
    <property name="queueName" value="testmq"/>
    <property name="queueUser" value="mquser"/>
</bean>    

<bean id="aqConnectionFactory" factory-bean="aqConnectionFactoryInitialiser"
    factory-method="createConnectionFactory" >
</bean>

…and the JmsTemplate…

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="aqConnectionFactory"/>
    <property name="defaultDestination" ref="testmq"/>
    <property name="receiveTimeout" value="30000"/>
</bean>

Once these are configured you can send a message by calling jmsTemplate.send() from anywhere you wish…

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage txtMsg = session.createTextMessage();
                txtMsg.setText("Hello World!");
                return txtMsg;
            }
        });

Configuring a listener is now simple

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="aqConnectionFactory"/>
    <property name="destination" ref="testmq"/>
    <property name="messageListener" ref="messageListener1" />
    <property name="sessionTransacted" value="true"/>
</bean><bean id="messageListener1"
    class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="uk.nominet.contact.mq.TestMqMessageListener"/>
    </constructor-arg>
</bean>

Read the docs for details on using the MessageListenerAdapter.

The documentation for managing queues and using JMS can be found here and here

24 Responses

  1. Raj Says:

    This article has proved indeed very useful. Thanks for that.
    However i am facing some issues when the queue payload type is not standard AQ type. It is a clob in my case. Whenever the listener tries to initialise i get the JMS 137 Error asking me to specify the payload factory. I have tried setting up the queue reciever with the payload factory, it still fails. Can you please offer any further guidance as to where i should set the payload factory. The payload factory has been created using jpub.
    Thanks for the help
    Raj

  2. tom Says:

    If you are using Spring, it looks like you would have to extend DefaultMessageListenerContainer and overide the createConsumer() method so that it calls the correct createConsumer() method – http://download-west.oracle.com/docs/cd/B14117_01/server.101/b12023/oracle/jms/AQjmsSession.html#createConsumer(javax.jms.Destination,%20java.lang.String,%20java.lang.Object,%20java.lang.String,%20boolean)

    I’ve stuck to using the standard JMS types so I’m afraid I don’t know much about the payload factories.

    You might have more luck on the Oracle Advanced Queueing forum – http://forums.oracle.com/forums/forum.jspa?forumID=66

  3. awy Says:

    Hi, this article very useful but I got error

    oracle.jms.AQjmsException: JMS-190: Queue mquser.testmq not found

    What’s wrong with my code?

  4. tom Says:

    awy:
    Either your queue table really doesn’t exist or the user in your JDBC connection properties doesn’t have permissions to access the queue.

    You should be able to verify the queue has been set up properly by seeing if the queue table you created exists in your schema (‘JMS_QUEUE_TABLE’).
    If it does then try stopping and starting the queue with

    exec dbms_aqadm.stop(queue_name=>’mquser.testmq’);
    exec dbms_aqadm.start(queue_name=>’mquser.testmq’);

    If this works then I’m not sure what might be wrong and can only suggest debugging OracleAqDestinationFactoryBean.getObject()

  5. awy Says:

    It’s working now, I didn’t read your statement “And then as that user create the queue” and the queue tables created in sys schema.

    Thanks Tom.

  6. OC4J, Spring and messaging - dolejsky.com Says:

    [...] standard JMS message types for AQ works just fine (see this great techblog’s article for description). To turn JTA transactions, add the following to your Spring configuration file: [...]

  7. mumbasa Says:

    I’ve tried to use Oracle AQ using JMS with code from this article and I can do almost everything but dequeue messages.
    (When I start consumer – messges do move to fail queue with max retry attempts = 5, but my MessageListener never gets called)
    Can you please post/send source code if it is possible. May be I’m missing something. I can upload my code as well.

    Thank you.

  8. frank Says:

    I tried to use jms XA (distributed transaction) to connect oracle AQ, are there any reference?

  9. tom Says:

    For the moment I’ve been avoiding distributed transactions by publishing messages with stored procedures (which are then part of a normal JDBC transaction).
    Whilst it’s on my list, I’m afraid I haven’t used XA so I don’t think I can be much help.

  10. Brice Copy Says:

    >> (There is also a
    >> getQueueConnectionFactory(DataSource ds)
    >> method which is probably what you really want)

    In effect, you cannot use any datasource (despite the interface declaration), it appears you have to stick to the Oracle implementation (e.g. oracle.jdbc.pool.OracleDataSource) – using an Apache DBCP datasource for instance will result in a JMS-112 (Error creating the db_connection) error message

  11. tom Says:

    > it appears you have to stick to
    > the Oracle implementation

    We only use the OracleDataSource but such a limitation wouldn’t surprise me :-P

    I have seen JMS-112 errors when accidentally using a TransactionAwareDataSourceProxy around the real DataSource (ie. this approach _isn’t_ transactional)

    Sending messages can be made to participate in a transaction by using AQjmsQueueConnectionFactory.createQueueConnection(dataSource.getConnection())

    Tom

  12. Richard Says:

    Hi,

    I’m trying to find some reference to how you instantiated the Destination bean (called “testmq” above) by using the name of the FactoryBean for the class attribute. The closest reference I could find is at http://static.springframework.org/spring/docs/2.5.x/reference/beans.html#beans-factory-class-static-factory-method … but that one requires including a factory-method attribute.

    I’ve scoured the ‘Net looking for where it’s documented.

  13. tom Says:

    FactoryBean is a Spring interface so it knows how to set up the bean.

    http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/beans/factory/FactoryBean.html

  14. Chintan Says:

    How do I create Payload factory and specify to my session or listener?
    Any help would be appreciated.

    Thanks,
    Chintan

  15. David Price Says:

    Superb. Really helped me on my way. Thanks for taking the time to do this.

  16. Amit Says:

    Raj,

    The AQ type aq$_jms_text_message can be used to store CLOB data as well. Check the set_text() payload IN CLOB) member procedure and ehte text_vc or text_lob fields. The specified payload is automatically stored in a CLOB or VARCHAR2 depending on its length.

  17. Amit Says:

    Thanks Tom for this nice article, was very much of help to me.

  18. tong Says:

    I got an exception as following. (I can enqueue using PL/SQL) Any ideas?

    [org.springframework.jms.listener.DefaultMessageListenerConta
    iner] – Initiating transaction rollback on application exception
    oracle.jms.AQjmsException: JMS-157: IO Exception
    at oracle.jms.AQjmsError.throwEx(AQjmsError.java:315)
    at oracle.jms.AQjmsMapMessage.convertByteArrayToHashMap(AQjmsMapMessage.java:1101)
    at oracle.jms.AQjmsMapMessage.getMap(AQjmsMapMessage.java:1020)

  19. Damien Says:

    Hi

    This tutorial is very useful, I have got around the initial payload problem. has anyone seen this error?
    Setup of JMS message listener invoker failed for destination ‘TEST.MESSAGE_QUEUE’ – trying to recover. Cause: JMS-120: Dequeue failed

  20. tom Says:

    Hi,

    I can’t offer much help, since I wrote this code so long ago (still works though!)

    Could you be seeing a permissions problem? If your JDBC user is different from the schema that owns the queue then you will need to re-run…

    exec dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY’,'jdbcuser’);

  21. Pablo Says:

    Hi,

    Firstly, thanks Tom for the clarifying article.
    Damien, I’ve got the same problem JMS-120… I’ve got everything set up, permissions, JDBC working with same queue user, but no dequeues…
    I read something about this problem being related to the manual message deleting from the queue.

    I’m quite blocked with this since a couple of days…

  22. John Says:

    Hi

    This example is extremely useful.
    I have the follow scenario.
    Read message off queue, call around 10 different methods to do work and final commit the message from the queue. If any of these queues throw an error I would like to put the message back in the queue. Is this use case possible with this code sample?

    P.S. Keep up the good work – great article

  23. johnson Says:

    i am getting following error
    Exception in thread “main” java.lang.AbstractMethodError: oracle.jms.AQjmsQueueConnectionFactory.createConnection()Ljavax/jms/Connection;
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:526)
    at com.boeing.massupdate.jms.JMSTest.main(JMSTest.java:23)

  24. Evgeny Says:

    Johnson,
    That’s because you’re using old Oracle JMS (aqapi.jar).

Leave a Comment

Please note: Comment moderation is enabled and may delay your comment. There is no need to resubmit your comment.

Recent Posts

Highest Rated

Categories

Archives

Meta: