Embeddable/Distributable JMS
Getting Started with JMS
Apache ActiveMQ
Supported Protocols and URIs
Apache ActiveMQ API
EE 1.4 API
EE 5 API
Round trip topic and queue producer and consumer JUnit example code
Apache ActiveMQ
Supported Protocols and URIs
Apache ActiveMQ API
EE 1.4 API
EE 5 API
Round trip topic and queue producer and consumer JUnit example code
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestMessaging {
private final String host = "localhost";
private final int port = 61616;
private final String brokerName = host + port;
private final String connectionUrl = "tcp://" + host + ":" + port;
private BrokerService broker;
private QueueConnection qConn;
private TopicConnection tConn;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setBrokerName(brokerName);
broker.addConnector(connectionUrl);
broker.start();
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUrl);
assertNotNull(cf);
qConn = cf.createQueueConnection();
qConn.start();
assertNotNull(qConn);
tConn = cf.createTopicConnection();
tConn.start();
assertNotNull(tConn);
}
@After
public void tearDown() throws Exception {
qConn.stop();
qConn.close();
tConn.stop();
tConn.close();
broker.stop();
}
@Test
public void testTopicMessaging() throws JMSException, InterruptedException {
final TopicSession topicSession = (TopicSession) tConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//final Topic testTopic = topicSession.createTemporaryTopic();
final Topic testTopic = topicSession.createTopic("test topic");
final TopicSubscriber topicSubscriber = topicSession.createSubscriber(testTopic);
final TestMessageListener listener = new TestMessageListener();
topicSubscriber.setMessageListener(listener);
final TopicPublisher topicPublisher = topicSession.createPublisher(testTopic);
final MapMessage mapMessage = topicSession.createMapMessage();
mapMessage.setStringProperty("foo", "bar");
mapMessage.setString("bar", "baz");
topicPublisher.publish(mapMessage);
waitForMessageDeliveryOrTimeout(listener);
assertNotNull(listener.message);
assertEquals(mapMessage.getJMSMessageID(), listener.message.getJMSMessageID());
listener.message = null;
topicPublisher.send(mapMessage);
waitForMessageDeliveryOrTimeout(listener);
assertNotNull(listener.message);
assertNotSame(mapMessage, listener.message);
assertEquals(mapMessage.getJMSMessageID(), listener.message.getJMSMessageID());
}
@Test
public void testQueueMessaging() throws JMSException, InterruptedException {
final QueueSession queueSession = (QueueSession) qConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//final Topic testTopic = topicSession.createTemporaryTopic();
final Queue testQueue = queueSession.createQueue("test queue");
final QueueReceiver queueReceiver = queueSession.createReceiver(testQueue);
final TestMessageListener listener = new TestMessageListener();
queueReceiver.setMessageListener(listener);
final QueueSender queueSender = queueSession.createSender(testQueue);
final TextMessage textMessage = queueSession.createTextMessage();
textMessage.setStringProperty("foo", "bar");
textMessage.setText("Some status message");
queueSender.send(textMessage);
waitForMessageDeliveryOrTimeout(listener);
assertNotNull(listener.message);
assertEquals(textMessage.getJMSMessageID(), listener.message.getJMSMessageID());
assertEquals(textMessage.getText(), ((TextMessage) listener.message).getText());
// System.out.println(((TextMessage) listener.message).getText());
}
private void waitForMessageDeliveryOrTimeout(TestMessageListener listener) throws InterruptedException {
final long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 1000 && listener.message == null) {
Thread.currentThread().sleep(100);
}
}
public class TestMessageListener implements MessageListener {
public Message message;
@Override
public void onMessage(final Message message) {
// System.out.println(message.toString());
this.message = message;
}
}
}
Comments