Spring and WebLogic Durable Subscriber

Based on a previous post, it was evident that continuing with the use of DefaultMessageListenerContainer with Topics, that messages can be lost. In order to resolve this issue, it is necessary to utilize Spring’s configuration (along with WebLogic requirements) for configuring a durable subscriber. This is needed in order to guarantee that when the DefaultMessageListenerContainer disconnects based on the receiveTimeout, that messages are not lost. Durable subscriber will also guarantee that as multiple subscribers to the Topic are connected/disconnected, that they will still be able process messages received while they were disconnected.

To have a good understanding of durable and just general JMS concepts, a good older article to read is Guaranteed Messaging with JMS.

And before I get too much in the details, the definitions defined in this snippet (Persistent vs Durable JMS messages), helps define the difference between durable and persistent – just to make sure I don’t get the two confused:

A ” durable message ” is a message where the JMS server will hold on to a message if the subscriber is temporarily unavaliable. So the durability is defined by the relationship between a “Topc Subscriber” and the “JMS Server”. Durability is applicable only to publish/Subscribe paradigm. For this to happen subscribers need to register themselves with a unique ” client id “.

A ” persistent message ” is a message that defines the relationship between a “Message Producer” and the “JMS Server”. This can be established for both point-to-point and publish/subscribe. This has to do with the guaranteed once only delivery of the message by persisting the message after it has been recieved from the message producer.

The ActiveMQ documentation gives us a little more information as well(How do durable queues and topics work):

Durable queues keep messages around persistently for any suitable consumer to consume them. Durable queues do not need to concern themselves with which consumer is going to consume the messages at some point in the future. There is just one copy of a message that any consumer in the future can consume.

Durable topics however are different as they must logically persist an instance of each suitable message for every durable consumer – since each durable consumer gets their own copy of the message.

So I need to take the following configuration that I had for a subscribe service against a Topic and make it durable.

1
2
3
4
5
6
7
8
<bean id="sampleSubscribeServiceListenerContainer"  
    class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    p:connectionFactory-ref="sampleMessagingConnectionFactory"
    p:destinationName="com/foo/dest/SampleTopic"
    p:destinationResolver-ref="sampleDestinationResolver"
    p:messageListener-ref="sampleSubscribeServiceMessageListener"
    p:pubSubDomain="true"
    p:transactionManager-ref="GlobalDataTransactionManager"/>

The interesting part is that quite a bit of time can be taken on Google trying to find information on how to do this and the Spring documentation does not really seem to describe what is necessary for durable subscription.

Here are a few resources with some details as to creating durable subscriber services via Spring:

Persistent JMS Topics using ActiveMQ and Spring

So based on this article and a few more resources, it is apparent that I need to add some additional pieces of configuration:

1
2
3
4
5
6
7
8
9
10
11
<bean id="sampleSubscribeServiceListenerContainer"  
    class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    p:connectionFactory-ref="sampleMessagingConnectionFactory"
    p:destinationName="com/foo/dest/SampleTopic"
    p:destinationResolver-ref="sampleDestinationResolver"
    p:messageListener-ref="sampleSubscribeServiceMessageListener"
    p:pubSubDomain="true"
    p:subscriptionDurable="true"
    p:durableSubscriptionName="sampleSubscribeService"
    p:clientId="sampleSubscribeService"
    p:transactionManager-ref="GlobalDataTransactionManager"/>

So I add three pieces of configuration (subscriptionDurable, durableSubscriptionName, and clientId), and then I deploy this to my server. Interestingly enough, the deployment works but the application fails on startup with the following error:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
weblogic.jms.common.IllegalStateException: [JMSClientExceptions:055032]An attempt was made to create a named consumer (sampleSubscribeService) on a connection with no clientID
    at weblogic.jms.client.JMSSession.throwWhenInvalidSubscriberName(JMSSession.java:2770) ~[weblogic.jar:10.3.5.0]
    at weblogic.jms.client.JMSSession.setupConsumer(JMSSession.java:2723) ~[weblogic.jar:10.3.5.0]
    at weblogic.jms.client.JMSSession.createConsumer(JMSSession.java:2691) ~[weblogic.jar:10.3.5.0]
    at weblogic.jms.client.JMSSession.createDurableSubscriber(JMSSession.java:2487) ~[weblogic.jar:10.3.5.0]
    at weblogic.jms.client.WLSessionImpl.createDurableSubscriber(WLSessionImpl.java:1233) ~[weblogic.jar:10.3.5.0]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createConsumer(AbstractPollingMessageListenerContainer.java:493) ~[spring-full.jar:3.0.5]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:223) ~[spring-full.jar:3.0.5]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:307) ~[spring-full.jar:3.0.5]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243) ~[spring-full.jar:3.0.5]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059) [spring-full.jar:3.0.5]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050) [spring-full.jar:3.0.5]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947) [spring-full.jar:3.0.5]
    at java.lang.Thread.run(Thread.java:662) [na:1.6.0_24]

So even though I defined a clientId within the configuration as is available through the Javadoc on DefaultMessageListenerContainer, I get hit with a clientID issue. If we go back to the ActiveMQ discussion we mentioned earlier, it gives some more information with regards to the importance of a clientID:

So for durable topic subscription, the JMS provider needs to be able to identify S when it shuts down and later on in the future reconnects, so it can know what messages to send to it while it was not running. JMS specification dictates that the identification of S is done by a combination of the clientID and the durable subscriber name. This is so that the JMS connection S uses can have many different durable subscriptions on different topics or on the same topic with different selectors – yet the JMS provider can know which message for which subscription to keep around for it.

So setting the clientID on a JMS connection is vital (along with using a sensible durable consumer name) for durable topic subscription.

So based upon this information, it appears that I need to have a clientId registered on the ConnectionFactory and not technically on the DefaultMessageListenerContainer (even though the API seems to allow it, it is not working based on the configuration I provided). So after quite a bit of searching, I found an article that I can’t seem to locate…but it points me towards SingleConnectionFactory which would allow me to wrap the ConnectionFactory that I am using and supply a clientId on that ConnectionFactory. Then I just utilize this new bean id (for the SingleConnectionFactory) in the configuration for my DefaultMessageListenerContainer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<bean id="sampleDurableTopicConnectionFactory"
    class="org.springframework.jms.connection.SingleConnectionFactory"
    p:targetConnectionFactory-ref="sampleMessagingConnectionFactory"
    p:clientId="sampleSubscribeService"/>

<bean id="sampleSubscribeServiceListenerContainer"  
    class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    p:connectionFactory-ref="sampleDurableTopicConnectionFactory"
    p:destinationName="com/foo/dest/SampleTopic"
    p:destinationResolver-ref="sampleDestinationResolver"
    p:messageListener-ref="sampleSubscribeServiceMessageListener"
    p:pubSubDomain="true"
    p:subscriptionDurable="true"
    p:durableSubscriptionName="sampleSubscribeService"
    p:transactionManager-ref="GlobalDataTransactionManager"/>

So I passed in our JMS ConnectionFactory to the configuration of the SingleConnectionFactory, and moved the clientId configuration to it as well. Then I pass in the sampleDurableTopicConnectionFactory as the connectionFactory within the configuration for my DefaultMessageListenerContainer. I deploy this new web application and to my surprise, it still doesn’t work. I get the following error (which is the same error I get if I connect to a Uniform Distributed Topic (WLS) with HermesJMS:

1
2
3
4
5
6
7
8
weblogic.jms.common.JMSException: [JMSClientExceptions:055030]This topic does not support durable subscriptions
    at weblogic.jms.client.JMSSession.createDurableSubscriber(JMSSession.java:2482)
    at weblogic.jms.client.WLSessionImpl.createDurableSubscriber(WLSessionImpl.java:1233)
    at hermes.impl.TopicBrowser.getEnumeration(TopicBrowser.java:220)
    at hermes.browser.tasks.BrowseDestinationTask.invoke(BrowseDestinationTask.java:146)
    at hermes.browser.tasks.TaskSupport.run(TaskSupport.java:175)
    at hermes.browser.tasks.ThreadPool.run(ThreadPool.java:170)
    at java.lang.Thread.run(Unknown Source)

Now I mentioned this in a previous blog post, but with WebLogic Uniform Distributed Topics, you have to do special configuration within a Subscribe Service in order to use Durable Subscriber since WebLogic does not allow a Durable Subscriber against a Uniform Distributed Topic (unless you point to one of the specific members…by including a single JMS Server in the JNDI url…like jmsServer1@/com/foo/SampleBasicTopic). The reason being is explained in the WebLogic Server Documentation:

Note: Durable subscribers (DurableTopicSubscriber) cannot be created for distributed topics. However, you can still create a durable subscription on distributed topic member and the other topic members will forward the messages to the topic member that has the durable subscription.

So in order to make this work, I simply need to modify the destinationName to include the JMS Server name:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<bean id="sampleDurableTopicConnectionFactory"
    class="org.springframework.jms.connection.SingleConnectionFactory"
    p:targetConnectionFactory-ref="sampleMessagingConnectionFactory"
    p:clientId="sampleSubscribeService"/>

<bean id="sampleSubscribeServiceListenerContainer"  
    class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    p:connectionFactory-ref="sampleDurableTopicConnectionFactory"
    p:destinationName="jmsServer1@com/foo/dest/SampleTopic"
    p:destinationResolver-ref="sampleDestinationResolver"
    p:messageListener-ref="sampleSubscribeServiceMessageListener"
    p:pubSubDomain="true"
    p:subscriptionDurable="true"
    p:durableSubscriptionName="sampleSubscribeService"
    p:transactionManager-ref="GlobalDataTransactionManager"/>

I rebuild my war and deploy, and luckily this time, it works. I now have a durable subscriber. The interesting part is that as I start clicking around WebLogic console and look at the configured ConnectionFactory, I notice a Client Id configuration.

With this newly discovered piece of information, I find some more information that clarifies the role of the Client Id in Developing Advanced Pub/Sub Applications.

A subscription key is used to uniquely identify a subscription. For non-durable subscriptions, the key is composed of the Client ID and Client ID Policy. For durable subscriptions, the key is composed of the Client ID, Client ID Policy, and Subscription Name.

So this clarifies that I should just be able to set the Client Id on the ConnectionFactory within WebLogic and remove the configuration of the SingleConnectionFactory from my Spring configuration. So I went ahead and modified my ConnectionFactory by adding into the Client Id the value “sampleSubscribeService”, and then removed the SingleConnectionFactory configuration from my Spring Context file, rebuild/redeployed my web application, and my subscribe service works with this configuration as well. I can even go into the WebLogic Server console, click on the my JMS Resource (the Topic), go to Monitoring, and I should see a “Durable Subscribers” tab with the subscription name and client id that I defined.

Share and Enjoy