1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | <!--activemq Begin--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <!-- <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>${spring.version}</version> </dependency>--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.0</version> </dependency> <!--activemq End--> |
2、activemq的配置文件:spring-jms.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | <!-- 启用spring mvc 注解 --> <context:component-scan base-package="org.soa.test.activemq"/> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke--> <property name="trustAllPackages" value="true"/> <!-- 是否异步发送 --> <property name="useAsyncSend" value="true" /> </bean> <!-- Queue模式 Begin --> <!-- 定义消息队列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>defaultQueueName</value> </constructor-arg> </bean> <!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="pubSubDomain" value="false"/> <!--接收超时时间--> <!--<property name="receiveTimeout" value="10000" />--> </bean> <!-- Queue模式 End --> |
三、队列发送端及测试程序
1、发送代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | package org.soa.test.activemq.queues;import org.soa.test.activemq.StudentInfo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import java.util.List;/** * Created by JamesC on 16-9-22. */@Componentpublic class ProduceMsg { @Autowired private JmsTemplate jmsTemplate; /** * 向指定队列发送消息 */ public void sendMessage(Destination destination, final String msg) { System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向默认队列发送消息(默认队列名称在bean:queueDestination配置) */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向队列" + destination + "发送了消息------------" + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向默认队列发送消息 */ public void sendMessageConvertAndSend(final Object msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向队列" + destination + "发送了消息------------" + msg); //使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组 jmsTemplate.convertAndSend(destination, msg); } /** * 向指定队列发送消息 */ public void sendStudentInfo(Destination destination, final StudentInfo msg) { System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(msg); } }); }} |
2、测试程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | package org.soa.test.activemq.queues;import com.alibaba.fastjson.JSON;import org.apache.activemq.command.ActiveMQQueue;import org.junit.Test;import org.junit.runner.RunWith;import org.soa.test.activemq.StudentInfo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.jms.Destination;import java.util.Date;/** * Created by JamesC on 16-9-22. */@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/spring-jms.xml")public class ProduceMsgTest extends AbstractJUnit4SpringContextTests { @Autowired protected ApplicationContext ctx; /** * 队列名queue1 这里使用jms配置文件中的数据 */ @Autowired private Destination queueDestination; /** * 队列消息生产者 */ @Autowired private ProduceMsg produceMessage; //向默认队列发消息(文本) @Test public void produceMsg_DefaultQueue() { String msg = "这里是向默认队列发送的消息" + new Date().toString(); produceMessage.sendMessage(msg); } //向默认队列发消息(Json字符串) @Test public void produceMsg_Json() { StudentInfo info = new StudentInfo(); info.setId(1); info.setStdName("李磊"); info.setStdNo("001"); info.setEnterDate(new Date()); //队列存放的是时间戳 String alibabaJson = JSON.toJSONString(info); produceMessage.sendMessage(alibabaJson); } //向默认队列发消息(使用convertAndSend发送对象) @Test public void produceMsg_ConvertAndSend() { StudentInfo info = new StudentInfo(); info.setId(1); info.setStdName("李磊"); info.setStdNo("001"); info.setEnterDate(new Date()); produceMessage.sendMessageConvertAndSend(info); } //向指定队列发消息(文本) @Test public void produceMsg_CustomQueue() { for (int i = 0; i < 20; i++) { ActiveMQQueue myDestination = new ActiveMQQueue("queueCustom"); produceMessage.sendMessage(myDestination, "----发送消息给queueCustom"); } } //向指定队列发消息(队列名称从XML读取) @Test public void produceMsg_XmlQueue() { for (int i = 0; i < 20; i++) { ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean("queueDestination"); produceMessage.sendMessage(destinationQueue, "----send my msg to queueXml"); } } //向指定队列发消息(发送对象) @Test public void produceMsg_StudentInfo() { StudentInfo info = new StudentInfo(); info.setId(1); info.setStdName("李磊"); info.setStdNo("001"); info.setEnterDate(new Date()); ActiveMQQueue destination = new ActiveMQQueue("StudentInfo"); produceMessage.sendStudentInfo(destination, info); }} |
四、队列消费端及测试程序
1、消费代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | package org.soa.test.activemq.queues;import org.soa.test.activemq.StudentInfo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.support.JmsUtils;import org.springframework.stereotype.Component;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.ObjectMessage;import javax.jms.TextMessage;/** * Created by JamesC on 16-9-22. */@Componentpublic class ConsumeMsg { @Autowired private JmsTemplate jmsTemplate; /** * 接受消息 */ public String receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); String msg = ""; try { msg = tm.getText(); System.out.println("从队列" + destination.toString() + "收到了消息:\t" + msg); } catch (JMSException e) { e.printStackTrace(); return ""; } return msg; } /** * 接受消息 */ public StudentInfo receiveStudentInfo() { try { String destination = jmsTemplate.getDefaultDestination().toString(); ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination); return (StudentInfo)msg.getObject(); } catch (JMSException e) { //检查性异常转换为非检查性异常 throw JmsUtils.convertJmsAccessException(e); } } /** * 接受消息 */ public Object receiveConvertAndReceive() { String destination = jmsTemplate.getDefaultDestination().toString(); Object msg = jmsTemplate.receiveAndConvert(destination); return msg; }} |
2、测试程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package org.soa.test.activemq.queues;import org.apache.activemq.command.ActiveMQQueue;import org.junit.Test;import org.junit.runner.RunWith;import org.soa.test.activemq.StudentInfo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/** * Created by JamesC on 16-9-22. */@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/spring-jms.xml")public class ConsumeMsgTest { @Autowired private ConsumeMsg consumeMsg; //从指定队列接收消息(文本) @Test public void receiveMsg() { //没有消息阻塞一段时间后会抛异常 //java.lang.NullPointerException ActiveMQQueue destination = new ActiveMQQueue("defaultQueueName"); consumeMsg.receive(destination); } //从指定队列接收消息(StudentInfo对象消息) @Test public void receiveStudentInfo() { StudentInfo msg = consumeMsg.receiveStudentInfo(); System.out.println(msg.getStdName()); } //从指定队列接收消息(Json对象) @Test public void receiveConvertAndReceive() { StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive(); System.out.println(msg.getStdName()); }} |
原文:http://www.cnblogs.com/gossip/p/5970090.html