JMS Topic using Spring and Active MQ
In our previous JMS posts, We learnt about creating JMS Queue, sending message and receiving message from Queue using Active MQ Broker. In this post, we are going to learn how to implement topic using Spring for Active MQ broker.
As we know from our previous post, A JMS client is an application that uses the services of the message broker. There are two types of clients, Consumer and Producer. Destinations are the place where message get stored for clients. They can be either queues or topics.
In publish/subscribe model, client which produces message is called Publisher and client which consumes message is known as Subscriber.
Topic is a particular destination where Publisher publishes messages. Subscribers subscribe to topic to consume messages. More than one Subscribers can subscribe to same topic and a message can be consumed by many subscribers.
We are going to divide our publisher and subscriber implementation in two part. First we will learn how to create topic and publish a message [XML as String] into topic.
Apache MQ fully supports Spring for configuration of client and message broker. So we will leverage <amq:> tag in our implementation.
As usual we will create Spring JMS template configuration in our spring context file by following these three steps.
Step 1: Configure Connection Factory
Step 2: Configure JMS destination
Step 3: Configure a JMS Template bean Step 4: Define producer bean
Note : JmsTemplate is designed for use with Java EE containers which provide connection pooling capabilities as standardized by the Java EE specifications. So in Non J2EE container, every call to the JmsTemplate.send() method creates and destroys all the JMS resources (connections, consumers, and producers).
So, In non J2EE container, You should use a pooled connection factory for sending messages with JmsTemplate.
As we already know, our producer context xml file will look like this.
appProdContext.xml
We will define our producer bean and leverage Spring JMS template to send message to destination topic.
MessageProducerBean.java
To send a xml message to destination topic, we will use a simple main class ProducerTest.
ProducerTest.java
Our MessageObject class will look like this
MessageObject.java
As we configured publisher, we will configure subscriber in our appConsumerContext.xml file.
Step 1: Configure Connection Factory
Step 2: Declare OXM JAXB Marseller bean
Step 3: Declare Message Converter bean
We register the MarshallingMessageConverter to use the JAXB2 marshaller for both marshaller abd unmarshaller. This converter bean will be used by spring message listener container to convert incoming message which contains XML into MessageObject using Spring OXM JAXB.
Step 4: Declare Consumer bean
Step 5: Define JMS Listener
Message listener container is used to receive messages from a JMS message queue/topic. Here we are creating a message listener container which is using consumer bean reference to delegate messages on onMessage() method.
So our final appConsumerContext.xml will look like
appConsumerContext.xml
After having this configuration file, Now its time to look at our Message Converter class and message consumer class.
MyMarshallingMessageConverter.java
We will use this simple ConsumerTest class to initialize the application context in main method to listen to topic.
ConsumerTest.java
- JMS using ActiveMQ
- JMS using Spring and ActiveMQ
- JMS Message Converter Using Spring
- Message Driven POJO using Spring and ActiveMQ
As we know from our previous post, A JMS client is an application that uses the services of the message broker. There are two types of clients, Consumer and Producer. Destinations are the place where message get stored for clients. They can be either queues or topics.
In publish/subscribe model, client which produces message is called Publisher and client which consumes message is known as Subscriber.
Topic is a particular destination where Publisher publishes messages. Subscribers subscribe to topic to consume messages. More than one Subscribers can subscribe to same topic and a message can be consumed by many subscribers.
We are going to divide our publisher and subscriber implementation in two part. First we will learn how to create topic and publish a message [XML as String] into topic.
Apache MQ fully supports Spring for configuration of client and message broker. So we will leverage <amq:> tag in our implementation.
As usual we will create Spring JMS template configuration in our spring context file by following these three steps.
Step 1: Configure Connection Factory
Step 2: Configure JMS destination
Step 3: Configure a JMS Template bean Step 4: Define producer bean
Note : JmsTemplate is designed for use with Java EE containers which provide connection pooling capabilities as standardized by the Java EE specifications. So in Non J2EE container, every call to the JmsTemplate.send() method creates and destroys all the JMS resources (connections, consumers, and producers).
So, In non J2EE container, You should use a pooled connection factory for sending messages with JmsTemplate.
Apache MQ pointed this problem as JmsTemplate gotchas
As we already know, our producer context xml file will look like this.
appProdContext.xml
We will define our producer bean and leverage Spring JMS template to send message to destination topic.
MessageProducerBean.java
package com.sarf.jms; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.support.JmsGatewaySupport; public class MessageProducerBean extends JmsGatewaySupport{ //Method receives String object and send //it to destination topic as BytesMessage public void sendMessage(final String myMessage) { getJmsTemplate().send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { //Create byte message BytesMessage message = session.createBytesMessage(); message.writeBytes(myMessage.getBytes()); return message; } }); } }
To send a xml message to destination topic, we will use a simple main class ProducerTest.
ProducerTest.java
package com.sarf.main; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.sarf.jms.MessageProducerBean; public class ProducerTest { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("appProdContext.xml"); MessageProducerBean mp = (MessageProducerBean) context.getBean("producer"); String msg = ""; mp.sendMessage(msg); System.out.println("Message sent to destination"); } } skhan "+ "Hello Topic!
---------------------------------------------------------------------------------------------------Now we will focus on implementing our subscriber which will consume XML and convert it to MessageObject using Spring OXM (Object XML Mappers).
Our MessageObject class will look like this
MessageObject.java
package com.sarf.data; import java.io.Serializable; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; @XmlAccessorType(XmlAccessType.FIELD) @XmlRootElement(name = "MessageObject") public class MessageObject implements Serializable{ @XmlElement(name = "mailId") private String mailId; @XmlElement(name = "message") private String message; public String getMailId() { return mailId; } public void setMailId(String mailId) { this.mailId = mailId; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
As we configured publisher, we will configure subscriber in our appConsumerContext.xml file.
Step 1: Configure Connection Factory
Step 2: Declare OXM JAXB Marseller bean
Step 3: Declare Message Converter bean
We register the MarshallingMessageConverter to use the JAXB2 marshaller for both marshaller abd unmarshaller. This converter bean will be used by spring message listener container to convert incoming message which contains XML into MessageObject using Spring OXM JAXB.
Step 4: Declare Consumer bean
Step 5: Define JMS Listener
Message listener container is used to receive messages from a JMS message queue/topic. Here we are creating a message listener container which is using consumer bean reference to delegate messages on onMessage() method.
So our final appConsumerContext.xml will look like
appConsumerContext.xml
After having this configuration file, Now its time to look at our Message Converter class and message consumer class.
MyMarshallingMessageConverter.java
package com.sarf.util; import javax.jms.JMSException; import javax.jms.Message; import org.springframework.jms.support.converter.MarshallingMessageConverter; import org.springframework.jms.support.converter.MessageConversionException; public class MyMarshallingMessageConverter extends MarshallingMessageConverter { @Override public Object fromMessage(Message message) throws JMSException, MessageConversionException{ return super.fromMessage(message); } }MessageConsumerBean.java
package com.sarf.jms; import com.sarf.data.MessageObject; public class MessageConsumerBean { public void onMessage(MessageObject message) { try { System.out.println("Mail # "+message.getMessage()+" received."); } catch (Exception e) { e.printStackTrace(); } } }
We will use this simple ConsumerTest class to initialize the application context in main method to listen to topic.
ConsumerTest.java
package com.sarf.main; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerTest { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("appConsumerContext.xml"); System.out.println("Consumer listening !!!!!!"); } }