Embeddable/Distributable JMS

Getting Started with JMS
Apache ActiveMQ
Supported Protocols and URIs
Apache ActiveMQ API
EE 1.4 API
EE 5 API

Round trip topic and queue producer and consumer JUnit example code


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;


public class TestMessaging {

private final String host = "localhost";
private final int port = 61616;
private final String brokerName = host + port;
private final String connectionUrl = "tcp://" + host + ":" + port;

private BrokerService broker;
private QueueConnection qConn;
private TopicConnection tConn;

@Before
public void setUp() throws Exception {

broker = new BrokerService();
broker.setBrokerName(brokerName);
broker.addConnector(connectionUrl);
broker.start();

final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUrl);
assertNotNull(cf);
qConn = cf.createQueueConnection();
qConn.start();
assertNotNull(qConn);
tConn = cf.createTopicConnection();
tConn.start();
assertNotNull(tConn);
}

@After
public void tearDown() throws Exception {

qConn.stop();
qConn.close();
tConn.stop();
tConn.close();
broker.stop();
}

@Test
public void testTopicMessaging() throws JMSException, InterruptedException {

final TopicSession topicSession = (TopicSession) tConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//final Topic testTopic = topicSession.createTemporaryTopic();
final Topic testTopic = topicSession.createTopic("test topic");

final TopicSubscriber topicSubscriber = topicSession.createSubscriber(testTopic);
final TestMessageListener listener = new TestMessageListener();
topicSubscriber.setMessageListener(listener);

final TopicPublisher topicPublisher = topicSession.createPublisher(testTopic);

final MapMessage mapMessage = topicSession.createMapMessage();
mapMessage.setStringProperty("foo", "bar");
mapMessage.setString("bar", "baz");

topicPublisher.publish(mapMessage);
waitForMessageDeliveryOrTimeout(listener);

assertNotNull(listener.message);
assertEquals(mapMessage.getJMSMessageID(), listener.message.getJMSMessageID());

listener.message = null;
topicPublisher.send(mapMessage);
waitForMessageDeliveryOrTimeout(listener);

assertNotNull(listener.message);
assertNotSame(mapMessage, listener.message);
assertEquals(mapMessage.getJMSMessageID(), listener.message.getJMSMessageID());
}

@Test
public void testQueueMessaging() throws JMSException, InterruptedException {

final QueueSession queueSession = (QueueSession) qConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//final Topic testTopic = topicSession.createTemporaryTopic();
final Queue testQueue = queueSession.createQueue("test queue");

final QueueReceiver queueReceiver = queueSession.createReceiver(testQueue);
final TestMessageListener listener = new TestMessageListener();
queueReceiver.setMessageListener(listener);

final QueueSender queueSender = queueSession.createSender(testQueue);

final TextMessage textMessage = queueSession.createTextMessage();
textMessage.setStringProperty("foo", "bar");
textMessage.setText("Some status message");

queueSender.send(textMessage);
waitForMessageDeliveryOrTimeout(listener);

assertNotNull(listener.message);
assertEquals(textMessage.getJMSMessageID(), listener.message.getJMSMessageID());
assertEquals(textMessage.getText(), ((TextMessage) listener.message).getText());
// System.out.println(((TextMessage) listener.message).getText());
}

private void waitForMessageDeliveryOrTimeout(TestMessageListener listener) throws InterruptedException {

final long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 1000 && listener.message == null) {
Thread.currentThread().sleep(100);
}
}

public class TestMessageListener implements MessageListener {

public Message message;

@Override
public void onMessage(final Message message) {

// System.out.println(message.toString());
this.message = message;
}
}
}

Comments

Popular posts from this blog

Sites, Newsletters, and Blogs

Oracle JDBC ReadTimeout QueryTimeout

Locks held on Oracle for hours after sessions abnormally terminated by node failure