Pages

Wednesday, June 25, 2008

JMS with activeMQ

Now that we know basics of ActiveMQ, it better to know JMS too. There are a lot of resources on web for learning JMS, the best one being the sun's JMS tutorial that ships along the J2EE tutorial. So, I wont put too many details in this JMS sample code. The code is self explanatory, and is given just to continue upon the prev post on ActiveMQ. I will be using the JNDI properties file explained in the prev post for publishing and consuming messages.

The jndi.properties is as follows:

# START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue


# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic

# END SNIPPET: jndi

This property file should be in the class path of the application. The queue is created dynamically.

JMS Producer


public class JMSProducer implements Runnable{
private static final Log LOG = LogFactory.getLog(JMSProducer.class);

public JMSProducer() {
}

//Run method implemented to run this as a thread.
public void run(){
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
String destinationName = null;
final int numMsgs;
destinationName = "MyQueue";
numMsgs = "5";
LOG.info("Destination name is " + destinationName);

/*
* Create a JNDI API InitialContext object
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
LOG.info("Could not create JNDI API context: " + e.toString());
System.exit(1);
}

/*
* Look up connection factory and destination.
*/
try {
connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
destination = (Destination)jndiContext.lookup(destinationName);
} catch (NamingException e) {
LOG.info("JNDI API lookup failed: " + e);
System.exit(1);
}

/*
* Create connection. Create session from connection; false means
* session is not transacted.create producer, set the text message, set the co-relation id and send the message.
*/
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
for (int i = 0; i < numMsgs; i++) {
message.setText("A message is being sent with number " + (i + 1));
message.setJMSCorrelationID("" + i);
LOG.info("Sending message: " + message.getText());
producer.send(message);
}

} catch (JMSException e) {
LOG.info("Exception occurred: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
}
}
}
}


}


JMS Consumer

public class JMSConsumer implements Runnable{
private static final Log LOG = LogFactory.getLog(JMSConsumer.class);

public void run() {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
Destination destination = null;
String sourceName = null;
final int numMsgs;
sourceName= "MyQueue";
numMsgs = "1";
LOG.info("Source name is " + sourceName);
/*
* Create a JNDI API InitialContext object
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
LOG.info("Could not create JNDI API context: " + e.toString());
System.exit(1);
}

/*
* Look up connection factory and destination.
*/
try {
connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
destination = (Destination)jndiContext.lookup(sourceName);
} catch (NamingException e) {
LOG.info("JNDI API lookup failed: " + e);
System.exit(1);
}


try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
connection.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
MessageListener listener = new MyQueueMessageListener();
consumer.setMessageListener(listener );
//Let the thread run for some time so that the Consumer has suffcient time to consume the message
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (JMSException e) {
LOG.info("Exception occurred: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
}
}
}
}

}

MessageListener

public class MyQueueMessageListener implements MessageListener {

/**
*
*/
public MyQueueMessageListener() {
// TODO Auto-generated constructor stub
}

/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
* This is called on receving of a text message.
*/
public void onMessage(Message arg0) {
if(arg0 instanceof TextMessage){
try {
//Print it out
System.out.println("Recieved message in listener: " + ((TextMessage)arg0).getText());

System.out.println("Co-Rel Id: " + ((TextMessage)arg0).getJMSCorrelationID());
try {
//Log it to a file
BufferedWriter outFile = new BufferedWriter(new FileWriter("MyQueueConsumer.txt"));
outFile.write("Recieved message in listener: " + ((TextMessage)arg0).getText());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
System.out.println("~~~~Listener : Error in message format~~~~");
}

}

}

Application to run Consumer and producer

public class SimpleApp {

//Run the producer first, then the consumer
public static void main(String[] args) throws Exception {
runInNewthread(new JMSProducer());
runInNewthread(new JMSConsumer());
}

public static void runInNewthread(Runnable runnable) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(false);
brokerThread.start();
}

}

No comments:

Post a Comment