????????JMS(Java Message Service)?可以提供应用的伸缩性,可以有效地避免服务被压垮。在Java EE?平台中,为了发送和接收JMS消息,必要的步骤有如下七步:
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
// 1创建 factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 2、创建factory
connection = connectionFactory.createConnection();
// 3、创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地
destination = session.createQueue(subject);
// 5.创建消息消生产者
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
/**
* 将传入的String 以TextMessage 格式发送出去
* @param message
* @throws JMSException
* @throws Exception
*/
public void produceMessage(String message) throws JMSException, Exception {
initialize();
// 创建文本消息
TextMessage msg = session.createTextMessage(message);
// 打开连接
connection.start();
// 发送消息
producer.send(msg);
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
?
public class ConsumerTool implements MessageListener {
/**
* JSM 用户
*/
private String user = ActiveMQConnection.DEFAULT_USER;
/**
* JSM 用户密码
*/
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
/**
* JSM 主题
*/
private String subject = "TOOL.DEFAULT";
/**
* 目的地
*/
private Destination destination = null;
/**
* 连接
*/
private Connection connection = null;
/**
* 会话
*/
private Session session = null;
/**
* 消息消费者
*/
private MessageConsumer consumer = null;
/**
* 初始化方法
* 1、通过user ,password,url 创建 connenctionFactory
* 2、通过connectionFactory 创建session
* 3、通过session创建目的地
* 5、创建指定主题消息的消费者
* @throws JMSException
* @throws Exception
*/
private void initialize() throws JMSException, Exception {
// 1、创建Factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 2、创建 connection
connection = connectionFactory.createConnection();
// 3、 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建 目的地
destination = session.createQueue(subject);
// 5、创建消息消费者
consumer = session.createConsumer(destination);
}
/**
* 消费消息方法,向消息消费者添加消息到达监听器
* @throws JMSException
* @throws Exception
*/
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
// 增加消息监听器
consumer.setMessageListener(this);
}
/**
* close 关闭
* @throws JMSException
*/
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
/**
* 接收到消息:sendTo:13919306243;content:hello!
* 当接收到消息的处理方法。将接收到的信息按照指定格式截取组装成SMS消息发送出去。
*/
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
String[] msgs = FileUtil.splitMessage(msg);
try {
if (null != msgs && !msgs[1].equals("")) {
for (String s : FileUtil.buildContent(msgs[1])) {
SMSSender.getInstance().send(
new OutboundMessage(msgs[0], s));
}
}
} catch (GatewayException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
?
可以发现实现一个简单的发送消息需要许多编码。Spring提供了一个基于模板的的解决方案,用于简化JMS消息实现的代码
?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://cxf.apache.org/jaxws
http://cxf.apache.org/schemas/jaxws.xsd
">
<aop:aspectj-autoproxy />
<context:annotation-config />
<context:component-scan base-package="com.cathy.demo.jms.*" />
<!-- connectionFactory -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- jmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="receiveTimeout" value="60000"/>
</bean>
<!-- 队列目的地 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="message.queue"/>
</bean>
<!-- 主题 -->
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="notifyTopic"/>
</bean>
</beans>
?
/**
*
* @author zhangwei_david
* @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
*/
@Component
public class ProducerImpl implements Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination destination;
/**
*/
public void send() {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key", "test");
return mapMessage;
}
});
}
}
?
/**
*
* @author zhangwei_david
* @version $Id: ReceiverImpl.java, v 0.1 2015年1月31日 下午8:53:49 zhangwei_david Exp $
*/
@Component
public class ReceiverImpl implements Receiver {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination destination;
/**
* @see com.cathy.demo.jms.receiver.Receiver#receive()
*/
public void receive() {
MapMessage mapMessage = (MapMessage) jmsTemplate.receive(destination);
if (mapMessage != null) {
System.out.println(mapMessage);
}
}
}
?
/**
*
* @author zhangwei_david
* @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
public class Sender {
@Autowired
private Producer producer;
@Autowired
private Receiver receiver;
@Test
public void testSend() {
producer.send();
}
@Test
public void testReceive() {
receiver.receive();
}
}
?
原文:http://zhangwei-david.iteye.com/blog/2182155