JMS
Java EE 5 Tutorial
JBoss 4 JMS configuration
JBoss 4 guide
JBoss configuration
JMS generic message ordering with property and filter Would need to use durable subscriptions for recovery upon failure or restart.
Create destinations:
In <jboss-home>/server/<config>/deploy/<xxx_service.xml>
TopicPublisher:
JBoss 4 JMS configuration
JBoss 4 guide
JBoss configuration
JMS generic message ordering with property and filter Would need to use durable subscriptions for recovery upon failure or restart.
JNDI name ConnectionFactory
Old
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
conn = qcf.createQueueConnection();
que = (Queue) iniCtx.lookup("queue/testQueue");
session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
New
@Resource(mappedName="jms/ConnectionFactory")
private static ConnectionFactory connectionFactory;
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, 0); //transacted
MessageProducer producer = session.createProducer(dest);
MessageProducer producer = session.createProducer(queue);
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage();
message.setText(msg_text); // msg_text is a String
producer.send(message);
connection.close();
@Resource(mappedName="jms/Topic")
private static Topic topic;
Create destinations:
In <jboss-home>/server/<config>/deploy/<xxx_service.xml>
<server>
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=someQueueName">
<depends optional-attribute-name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
</mbean>
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,
name=someTopicName">
<depends optional-attribute name="DestinationManager">
jboss.mq:service=DestinationManager
</depends>
</mbean>
</server>
TopicPublisher:
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.NamingException;
import org.jboss.seam.log.Log;
import org.jboss.seam.log.Logging;
import org.jboss.seam.util.Naming;
/**
* Manages the thread-safe, auto-commit, non-persistent, best-effort publishing
* of {@link Message}s to a {@link Topic}.
*
* The close method should be explicitly called to close any underlying
* communication resources when the {@link TopicPublisher} is no longer needed.
*
* @author Troy T. Collinsworth
*/
// In the future some client might want to set the publishing mode - pragmatism.
public class TopicPublisher {
private final String topicName;
private static final Log LOG = Logging.getLog(TopicPublisher.class);
private Connection connection;
private Session session;
private MessageProducer publisher;
public TopicPublisher (final String topicName) {
this.topicName = topicName;
initPublisher();
}
/**
* Publish the {@link Serializable} object as an {@link ObjectMessage},
* best-effort.
*
* @param object
*/
// In the future, come client might want to set message properties -
// pragmatism.
public void publishMessage (final Serializable object) {
synchronized (this) {
// If we experience any issues we'll try once to re-init and resend.
int attempt = 1;
while (true) {
if (attempt == 2) {
LOG.error("Publishing last message failed, retrying");
} else if (attempt > 2) {
LOG.error("Message publishing failed skipping: " + object);
break;
}
try {
final ObjectMessage objectMessage = session.createObjectMessage(object);
// 9 is expedited (highest) priority
// 0 is unlimited TTL
publisher.send(objectMessage, DeliveryMode.NON_PERSISTENT, 9, 0);
break;
} catch (final JMSException e) {
LOG.error("Failed to publish message", e);
close();
initPublisher();
} catch (final RuntimeException e) {
LOG.error("Failed to publish message", e);
close();
initPublisher();
}
++attempt;
}
}
}
/**
* Initializes the producer, best-effort.
*/
private void initPublisher () {
synchronized (this) {
try {
final TopicConnectionFactory connectionFactory = (TopicConnectionFactory)Naming.getInitialContext().lookup("java:/ConnectionFactory");
connection = connectionFactory.createTopicConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = (Topic)Naming.getInitialContext().lookup(topicName);
publisher = session.createProducer(destination);
} catch (final NamingException e) {
LOG.error(String.format("Error initializing TopicProducer %s", topicName), e);
} catch (final JMSException e) {
LOG.error(String.format("Error initializing TopicProducer %s", topicName), e);
}
}
}
/**
* Closes any underlying communication resources related to the
* {@link TopicProducer}. This method should be explicitly called when the
* {@link TopicProducer} is no longer needed.
*/
public void close () {
synchronized (this) {
closeTopicPublisher();
closeConnection();
}
}
private void closeTopicPublisher () {
if (publisher != null) {
try {
publisher.close();
publisher = null;
} catch (final JMSException e) {
LOG.error(String.format("Error closing TopicProducer %s", topicName), e);
}
}
}
private void closeConnection () {
if (connection != null) {
try {
connection.close();
connection = null;
} catch (final JMSException e) {
LOG.error(String.format("Error closing connection for TopicProducer %s", topicName), e);
}
}
}
}
Comments