Saturday, September 27, 2014

JMS Topic using Spring and Active MQ

In our previous JMS posts, We learnt about creating JMS Queue, sending message and receiving message from Queue using Active MQ Broker. In this post, we are going to learn how to implement topic using Spring for Active MQ broker.



As we know from our previous post, A JMS client is an application that uses the services of the message broker. There are two types   of clients, Consumer and Producer. Destinations are the place where message get stored for clients. They can be either queues or topics.

In publish/subscribe model, client which produces message is called Publisher and client which consumes message is known as Subscriber.
Topic is a particular destination where Publisher publishes messages. Subscribers subscribe to topic to consume messages. More than one Subscribers can subscribe to same topic and a message can be consumed by many subscribers.


We are going to divide our publisher and subscriber implementation in two part. First we will learn how to create topic and publish a message [XML as String] into topic.

Apache MQ fully supports Spring for configuration of client and message broker. So we will leverage <amq:> tag in our implementation.
As usual we will create Spring JMS template configuration in our spring context file by following these three steps.

Step 1: Configure Connection Factory
Step 2: Configure JMS destination
Step 3: Configure a JMS Template bean Step 4: Define producer bean

Note : JmsTemplate is designed for use with Java EE containers which  provide connection pooling capabilities as standardized by the Java EE specifications. So in Non J2EE container, every call to the JmsTemplate.send() method  creates and destroys all the JMS resources (connections, consumers, and producers).
So, In non J2EE container, You should use a pooled connection factory for sending messages with JmsTemplate.
Apache MQ pointed this problem as JmsTemplate gotchas

As we already know, our producer context xml file will look like this.
appProdContext.xml
We will define our producer bean and leverage Spring JMS template to send message to destination topic.

MessageProducerBean.java
package com.sarf.jms;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.support.JmsGatewaySupport;

public class MessageProducerBean extends JmsGatewaySupport{
  //Method receives String object and send
  //it to destination topic as BytesMessage
  public void sendMessage(final String myMessage) {
   getJmsTemplate().send(new MessageCreator() {
   public Message createMessage(Session session) throws JMSException {
   //Create byte message
   BytesMessage message = session.createBytesMessage();
   message.writeBytes(myMessage.getBytes());
   return message;
   }
   });
 }
}

To send a xml message to destination topic, we will use a simple main class ProducerTest.
ProducerTest.java
package com.sarf.main;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.sarf.jms.MessageProducerBean;

public class ProducerTest {   
 public static void main(String[] args) {
    ApplicationContext context =
     new ClassPathXmlApplicationContext("appProdContext.xml");
     MessageProducerBean mp = (MessageProducerBean) 
                  context.getBean("producer");
     String msg = "skhan"+
      "Hello Topic!";
     mp.sendMessage(msg);
     System.out.println("Message sent to destination");
     } 
}
---------------------------------------------------------------------------------------------------
Now we will focus on implementing our subscriber which will consume XML and convert it to MessageObject using Spring OXM (Object XML Mappers).

Our MessageObject class will look like this
MessageObject.java
package com.sarf.data; 

import java.io.Serializable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@XmlAccessorType(XmlAccessType.FIELD)
@XmlRootElement(name = "MessageObject")
public class MessageObject implements Serializable{
 
  @XmlElement(name = "mailId")
  private String mailId;
  
  @XmlElement(name = "message")
  private String message;
    
 public String getMailId() {
  return mailId;
 }
 public void setMailId(String mailId) {
  this.mailId = mailId;
 }
 public String getMessage() {
  return message;
 }
 public void setMessage(String message) {
  this.message = message;
 }
}

As we configured publisher, we will configure subscriber in our appConsumerContext.xml file.

Step 1: Configure Connection Factory
Step 2: Declare OXM JAXB Marseller bean
Step 3: Declare Message Converter bean

We register the MarshallingMessageConverter to use the JAXB2 marshaller for both marshaller abd unmarshaller. This converter bean will be used by spring message listener container to convert incoming message which contains XML into MessageObject using Spring OXM JAXB.

Step 4: Declare Consumer bean 
Step 5: Define JMS Listener 

Message listener container is used to receive messages from a JMS message queue/topic. Here we are creating a message listener container which is using consumer bean reference to delegate messages on onMessage() method.

So our final appConsumerContext.xml will look like
appConsumerContext.xml
After having this configuration file, Now its time to look at our Message Converter class and message consumer class.
MyMarshallingMessageConverter.java
package com.sarf.util;

import javax.jms.JMSException;
import javax.jms.Message;
import org.springframework.jms.support.converter.MarshallingMessageConverter;
import org.springframework.jms.support.converter.MessageConversionException;

public class MyMarshallingMessageConverter 
 extends MarshallingMessageConverter {
  @Override
  public Object fromMessage(Message message) 
     throws JMSException, MessageConversionException{
      return super.fromMessage(message);
   }
}
MessageConsumerBean.java
package com.sarf.jms;
import com.sarf.data.MessageObject;

public class MessageConsumerBean {
 public void onMessage(MessageObject message) {
  try {
      System.out.println("Mail # "+message.getMessage()+" received.");
      } catch (Exception e) {
 e.printStackTrace();
 }
  }
}

We will use this simple ConsumerTest class to initialize the application context in main method to listen to topic.
ConsumerTest.java
package com.sarf.main;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerTest {
 public static void main(String[] args) {
    ApplicationContext context =
     new ClassPathXmlApplicationContext("appConsumerContext.xml");
     System.out.println("Consumer listening !!!!!!");
   }
}

 Click here to get source code